Utilities
Pipe, flow, retry, sleep, timeout, and tap for functional composition and resilience
This module provides utility functions for functional composition, resilience patterns, and timing operations.
Pipe and Flow
Why Composition Matters
Instead of nesting functions or creating intermediate variables, composition creates a clear linear flow:
// Hard to read - nested and right-to-left
const result = capitalize(reverse(trim(input)));
// Clear linear flow - left-to-right, each step is explicit
const result = pipe(
input,
trim,
reverse,
capitalize
);pipe(value, ...fns)
Pipes a value through functions left-to-right. Each function receives the result of the previous one.
Why use pipe:
- Creates readable linear transformations
- Each step is explicit and easy to follow
- Easier to debug - add
tapat any step to see values - Reorder steps by moving function references
import { pipe } from '@deessejs/fp';
// Before: nested, hard to follow
const username = capitalize(JSON.parse(localStorage.getItem('user')).name);
// After: linear flow, each step is clear
const username = pipe(
localStorage.getItem('user'), // Get string from storage
JSON.parse, // Parse to object
(user: { name: string }) => user.name, // Extract name
capitalize // Transform
);flow(...fns)
Creates a reusable function from a sequence of transformations. Unlike pipe, flow returns a function you can call with different inputs.
Why use flow:
- Create reusable transformation pipelines
- Partial application - create specialized versions
- Cleaner API for public functions
import { flow } from '@deessejs/fp';
// Create a reusable transformation pipeline
const processUserData = flow(
(data: string) => JSON.parse(data),
(user: User) => validateUser(user),
(user: User) => normalizeUser(user),
(user: User) => enrichUser(user)
);
// Use it on different inputs
const processed1 = processUserData(rawJson1);
const processed2 = processUserData(rawJson2);The first function in flow can accept multiple arguments - useful for creating binary or variadic functions:
import { flow } from '@deessejs/fp';
// First function gets multiple args, rest get single result
const createLogger = flow(
(level: string, message: string) => ({ level, message, timestamp: Date.now() }),
log => JSON.stringify(log)
);
createLogger('info', 'User logged in'); // '{"level":"info","message":"User logged in",...}'pipeAsync(value, ...fns)
Pipes a value through a mix of sync and async functions. Each function is awaited if it returns a Promise.
Why use pipeAsync:
- Handle mixed sync/async transformations cleanly
- Avoid callback hell with async operations
- Clear error handling with Result/AsyncResult
import { pipeAsync } from '@deessejs/fp';
const result = await pipeAsync(
fetchUser(id), // Promise<User>
async user => await validateAge(user), // Async validation
user => enrichProfile(user), // Sync transformation
user => user.score.toString() // Sync final transform
);flowAsync(...fns)
Creates a reusable async function from a sequence of transformations.
Why use flowAsync:
- Define async pipelines once, reuse many times
- Composable async operations
- Works with both promises and thenables
import { flowAsync } from '@deessejs/fp';
const fetchAndProcessUser = flowAsync(
async (id: string) => {
const res = await fetch(`/api/users/${id}`);
return res.json() as Promise<User>;
},
async user => await validatePermissions(user),
user => enrichUser(user)
);
const user1 = await fetchAndProcessUser('123');
const user2 = await fetchAndProcessUser('456');Tap (Side Effects)
Why Side Effects Need Special Treatment
Side effects (logging, analytics, debugging) are necessary but shouldn't interfere with data flow. Tap lets you add effects without transforming values:
// Without tap - must return the value
const result = pipe(
user,
user => {
console.log('User:', user);
return user; // Must remember to return!
},
user => user.name
);
// With tap - explicit about intent
import { pipe, tap } from '@deessejs/fp';
const result = pipe(
user,
tap(user => console.log('User:', user)), // Side effect, passes through
user => user.name
);tap(fn)
Executes a side effect function and returns the original value unchanged. Use for logging, debugging, or any effect that doesn't change the flow.
Why use tap:
- Debug pipelines without rewriting them
- Add logging without affecting the result
- Separate concerns - business logic vs side effects
import { pipe, tap } from '@deessejs/fp';
pipe(
user,
tap(user => console.log('Before:', user)), // Debug step
user => normalizeUser(user),
tap(user => console.log('After:', user)), // Another debug step
user => user.email
);tapAsync(fn)
Same as tap but for async side effects. Use for analytics calls, async logging, or any effect that returns a Promise.
import { pipeAsync, tapAsync } from '@deessejs/fp';
const result = await pipeAsync(
user,
tapAsync(user => sendAnalytics(user.id)), // Async, non-blocking
async user => await enrichUser(user)
);tapSafe(fn, onError?)
Safe version of tap that catches errors in the side effect. If the side effect throws, the value still passes through.
Why use tapSafe:
- Prevent side effect failures from breaking your pipeline
- Optional error handling via onError callback
- Useful for non-critical effects like optional logging
import { pipe, tapSafe } from '@deessejs/fp';
const result = pipe(
user,
tapSafe(
user => optionalAnalytics.track(user.id), // Might fail, but we don't care
err => console.warn('Analytics failed:', err) // Handle if needed
),
user => user.name
);
// Even if analytics fails, result is still computedSleep
Why Deliberate Delays Matter
Sometimes you need to wait - for rate limiting, debouncing, or honoring API quotas:
// Without sleep - all requests fire immediately
const results = await Promise.all(
users.map(user => api.updateUser(user)) // Might hit rate limit
);
// With sleep - respects API limits
const results = [];
for (const user of users) {
results.push(await api.updateUser(user));
await sleep(100); // Wait 100ms between requests
}sleep(ms, options?)
Creates a Promise that resolves after a specified delay.
Why use sleep:
- Rate limiting for API calls
- Debouncing user input
- Retry backoff without custom implementation
- Any scenario requiring a deliberate pause
import { sleep } from '@deessejs/fp';
// Simple delay
await sleep(1000); // Wait 1 second
// With jitter - adds randomness to prevent thundering herd
await sleep(1000, { jitter: true }); // 500-1500mssleepWithSignal(ms, options?)
Sleep that can be cancelled via AbortSignal.
Why use sleepWithSignal:
- Cancel long sleeps when user navigates away
- Combine with request cancellation
- Implement request timeouts
import { sleepWithSignal } from '@deessejs/fp';
const controller = new AbortController();
// Start a long operation that can be cancelled
const longTask = async () => {
await sleepWithSignal(60000, { signal: controller.signal });
return doWork();
};
// User clicks cancel
cancelButton.onclick = () => controller.abort();withTimeout(promise, ms, options?)
Adds a timeout to any Promise. Throws TimeoutError if the operation takes too long.
Why use withTimeout:
- Prevent hanging requests
- Enforce SLAs on operations
- Better UX - show timeout messages instead of spinners
import { withTimeout } from '@deessejs/fp';
try {
const data = await withTimeout(fetchData(), 5000, {
message: 'Request took too long'
});
} catch (error) {
if (error.name === 'TIMEOUT') {
showTimeoutMessage();
}
}TimeoutError structure:
interface TimeoutError extends Error {
name: 'TIMEOUT';
timeout: number; // Configured timeout in ms
elapsed?: number; // Actual time before timeout
}Signal Injection Mode
For fine-grained control over abortable operations, pass a function that receives an AbortSignal:
import { withTimeout } from '@deessejs/fp';
const { promise, cleanup } = withTimeout(
(signal) => fetch('/api/data', { signal }),
5000
);
try {
const data = await promise;
} finally {
cleanup(); // Call cleanup to abort and release resources
}This is better than wrapping the promise directly because the signal is injected into the fetch call itself, allowing the HTTP request to be cancelled properly.
addJitter(delay, jitter?)
Adds randomness to delays to prevent thundering herd problems.
Why use jitter:
- In distributed systems, many clients might retry at the same time
- Jitter spreads out retries over time
- Reduces load spikes on recovering services
import { addJitter } from '@deessejs/fp';
// Full jitter: 50% to 150% of delay (recommended for most cases)
addJitter(1000, true); // 500-1500ms
// Partial jitter: 80% to 120% with 0.2 variance
addJitter(1000, 0.2); // 800-1200msRetry
Why Retry Matters
Network requests and external services fail transiently. Instead of failing immediately, retrying with backoff handles temporary issues gracefully:
// Without retry - fails on first transient error
const data = await fetchFromFlakyService(); // Might fail
// With retry - automatically retries with backoff
const data = await retryAsync(
() => fetchFromFlakyService(),
{ attempts: 3, delay: 1000, backoff: 'exponential' }
);retry(fn, options?)
Retries a synchronous function on failure. Uses busy-wait for delays.
Warning: This blocks the event loop during retries. Use retryAsync instead for any async operations or in environments with UI.
retryAsync(fn, options?)
Retries an async function on failure. Use this in production.
Why use retryAsync:
- Handle transient network failures gracefully
- Implement circuit breakers patterns
- Give external services time to recover
import { retryAsync } from '@deessejs/fp';
const data = await retryAsync(
async () => {
const res = await fetch('/api/flaky-endpoint');
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
{
attempts: 3,
delay: 1000,
backoff: 'exponential' // 1s, 2s, 4s...
}
);Retry Options
interface RetryOptions {
/** Number of attempts before giving up (default: 3) */
attempts?: number;
/** Initial delay between retries in ms (default: 1000) */
delay?: number;
/** How to increase delay between attempts */
backoff?: 'exponential' | 'linear' | 'constant' | ((attempt: number, delay: number) => number);
/** Maximum delay cap */
maxDelay?: number;
/** Only retry if predicate returns true for the error */
predicate?: (error: Error) => boolean;
/** Called before each retry attempt */
onRetry?: (error: Error, attempt: number) => void;
/** Add randomness to delay */
jitter?: boolean;
/** AbortSignal to cancel retries */
signal?: AbortSignal;
}Backoff Strategies
Exponential (default): Delay doubles each attempt
await retryAsync(fn, { delay: 1000, backoff: 'exponential' });
// Attempts at: 0ms, 1000ms, 2000ms, 4000ms...Linear: Delay increases linearly
await retryAsync(fn, { delay: 1000, backoff: 'linear' });
// Attempts at: 0ms, 1000ms, 2000ms, 3000ms...Constant: Same delay between all attempts
await retryAsync(fn, { delay: 1000, backoff: 'constant' });
// Attempts at: 0ms, 1000ms, 2000ms, 3000ms...Custom function: Full control over delay calculation
await retryAsync(fn, {
delay: 1000,
backoff: (attempt, delay) => {
const jitter = Math.random() * 100;
return delay * Math.pow(2, attempt - 1) + jitter;
}
});Standalone Backoff Functions
Instead of using the string options, you can import and use the standalone backoff functions directly. This is useful when you want to reuse the same strategy across multiple retries or compose them with other delay modifiers:
import { retryAsync, exponentialBackoff, linearBackoff, constantBackoff } from '@deessejs/fp';
// Exponential backoff: 1s, 2s, 4s, 8s...
retryAsync(fn, { delay: 1000, backoff: exponentialBackoff });
// Linear backoff: 1s, 2s, 3s, 4s...
retryAsync(fn, { delay: 1000, backoff: linearBackoff });
// Constant backoff: 1s, 1s, 1s, 1s...
retryAsync(fn, { delay: 1000, backoff: constantBackoff });These functions match the behavior of their string equivalents but can be composed or modified:
import { retryAsync, exponentialBackoff } from '@deessejs/fp';
// Combine with jitter using a custom backoff
retryAsync(fn, {
delay: 1000,
backoff: (attempt, delay) => {
const jitter = Math.random() * 200;
return exponentialBackoff(attempt, delay) + jitter;
}
});Selective Retry with Predicate
Not all errors should trigger a retry:
await retryAsync(
async () => {
const result = await riskyOperation();
if (result.validationFailed) throw new Error('Invalid input');
return result;
},
{
// Only retry on network errors, not business logic errors
predicate: (error) => error.message.includes('network')
}
);Monitoring Retries
Use onRetry to log or track retry attempts:
import { retryAsync } from '@deessejs/fp';
await retryAsync(
async () => flakyOperation(),
{
attempts: 5,
onRetry: (error, attempt) => {
console.log(`Attempt ${attempt} failed: ${error.message}`);
metrics.increment('retry.count');
}
}
);Cancellation
Use AbortSignal to cancel retries:
import { retryAsync } from '@deessejs/fp';
const controller = new AbortController();
try {
await retryAsync(
async () => flakyOperation(),
{ signal: controller.signal }
);
} catch (error) {
if (error.name === 'RetryAbortedError') {
console.log('Retry was cancelled');
}
}
// Cancel on user navigation, component unmount, etc.
window.addEventListener('beforeunload', () => controller.abort());Real-World Examples
API Client with Retry and Timeout
import { retryAsync, withTimeout } from '@deessejs/fp';
import { pipeAsync } from '@deessejs/fp';
const createApiClient = (baseUrl: string) => {
const request = <T>(path: string) =>
pipeAsync(
withTimeout(
(signal) => fetch(`${baseUrl}${path}`, { signal }),
5000,
{ name: 'ApiTimeout' }
),
async (res: Response) => {
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json() as Promise<T>;
}
);
return {
getUser: (id: string) =>
retryAsync(() => request<User>(`/users/${id}`), {
attempts: 3,
delay: 500,
backoff: 'exponential',
predicate: (error) => error.message.includes('network')
}),
getPosts: (userId: string) =>
retryAsync(() => request<Post[]>(`/users/${userId}/posts`), {
attempts: 3,
delay: 500
})
};
};Data Processing Pipeline with Progress
import { pipe, flow, tap } from '@deessejs/fp';
const processUserBatch = flow(
(batch: RawUser[]) => batch.map(parseUser),
users => users.filter(isValidUser),
users => users.map(enrichUser),
tap(users => saveToDatabase(users)) // Side effect without breaking flow
);Rate-Limited API Calls
import { sleep } from '@deessejs/fp';
const fetchWithRateLimit = async <T>(
urls: string[],
options: { delay: number; concurrency: number }
): Promise<T[]> => {
const results: T[] = [];
for (let i = 0; i < urls.length; i++) {
results.push(await fetch(urls[i]));
// Don't delay after the last item
if (i < urls.length - 1) {
await sleep(options.delay);
}
}
return results;
};See Also
- Result - For error handling in sync operations
- AsyncResult - For error handling in async operations
- Maybe - For optional values
- Yield - For yielding control to the event loop