@deessejs/fp

Utilities

Pipe, flow, retry, sleep, timeout, and tap for functional composition and resilience

This module provides utility functions for functional composition, resilience patterns, and timing operations.

Pipe and Flow

Why Composition Matters

Instead of nesting functions or creating intermediate variables, composition creates a clear linear flow:

// Hard to read - nested and right-to-left
const result = capitalize(reverse(trim(input)));

// Clear linear flow - left-to-right, each step is explicit
const result = pipe(
  input,
  trim,
  reverse,
  capitalize
);

pipe(value, ...fns)

Pipes a value through functions left-to-right. Each function receives the result of the previous one.

Why use pipe:

  • Creates readable linear transformations
  • Each step is explicit and easy to follow
  • Easier to debug - add tap at any step to see values
  • Reorder steps by moving function references
import { pipe } from '@deessejs/fp';

// Before: nested, hard to follow
const username = capitalize(JSON.parse(localStorage.getItem('user')).name);

// After: linear flow, each step is clear
const username = pipe(
  localStorage.getItem('user'),  // Get string from storage
  JSON.parse,                     // Parse to object
  (user: { name: string }) => user.name, // Extract name
  capitalize                      // Transform
);

flow(...fns)

Creates a reusable function from a sequence of transformations. Unlike pipe, flow returns a function you can call with different inputs.

Why use flow:

  • Create reusable transformation pipelines
  • Partial application - create specialized versions
  • Cleaner API for public functions
import { flow } from '@deessejs/fp';

// Create a reusable transformation pipeline
const processUserData = flow(
  (data: string) => JSON.parse(data),
  (user: User) => validateUser(user),
  (user: User) => normalizeUser(user),
  (user: User) => enrichUser(user)
);

// Use it on different inputs
const processed1 = processUserData(rawJson1);
const processed2 = processUserData(rawJson2);

The first function in flow can accept multiple arguments - useful for creating binary or variadic functions:

import { flow } from '@deessejs/fp';

// First function gets multiple args, rest get single result
const createLogger = flow(
  (level: string, message: string) => ({ level, message, timestamp: Date.now() }),
  log => JSON.stringify(log)
);

createLogger('info', 'User logged in'); // '{"level":"info","message":"User logged in",...}'

pipeAsync(value, ...fns)

Pipes a value through a mix of sync and async functions. Each function is awaited if it returns a Promise.

Why use pipeAsync:

  • Handle mixed sync/async transformations cleanly
  • Avoid callback hell with async operations
  • Clear error handling with Result/AsyncResult
import { pipeAsync } from '@deessejs/fp';

const result = await pipeAsync(
  fetchUser(id),                    // Promise<User>
  async user => await validateAge(user),  // Async validation
  user => enrichProfile(user),      // Sync transformation
  user => user.score.toString()     // Sync final transform
);

flowAsync(...fns)

Creates a reusable async function from a sequence of transformations.

Why use flowAsync:

  • Define async pipelines once, reuse many times
  • Composable async operations
  • Works with both promises and thenables
import { flowAsync } from '@deessejs/fp';

const fetchAndProcessUser = flowAsync(
  async (id: string) => {
    const res = await fetch(`/api/users/${id}`);
    return res.json() as Promise<User>;
  },
  async user => await validatePermissions(user),
  user => enrichUser(user)
);

const user1 = await fetchAndProcessUser('123');
const user2 = await fetchAndProcessUser('456');

Tap (Side Effects)

Why Side Effects Need Special Treatment

Side effects (logging, analytics, debugging) are necessary but shouldn't interfere with data flow. Tap lets you add effects without transforming values:

// Without tap - must return the value
const result = pipe(
  user,
  user => {
    console.log('User:', user);
    return user; // Must remember to return!
  },
  user => user.name
);

// With tap - explicit about intent
import { pipe, tap } from '@deessejs/fp';

const result = pipe(
  user,
  tap(user => console.log('User:', user)), // Side effect, passes through
  user => user.name
);

tap(fn)

Executes a side effect function and returns the original value unchanged. Use for logging, debugging, or any effect that doesn't change the flow.

Why use tap:

  • Debug pipelines without rewriting them
  • Add logging without affecting the result
  • Separate concerns - business logic vs side effects
import { pipe, tap } from '@deessejs/fp';

pipe(
  user,
  tap(user => console.log('Before:', user)), // Debug step
  user => normalizeUser(user),
  tap(user => console.log('After:', user)),  // Another debug step
  user => user.email
);

tapAsync(fn)

Same as tap but for async side effects. Use for analytics calls, async logging, or any effect that returns a Promise.

import { pipeAsync, tapAsync } from '@deessejs/fp';

const result = await pipeAsync(
  user,
  tapAsync(user => sendAnalytics(user.id)), // Async, non-blocking
  async user => await enrichUser(user)
);

tapSafe(fn, onError?)

Safe version of tap that catches errors in the side effect. If the side effect throws, the value still passes through.

Why use tapSafe:

  • Prevent side effect failures from breaking your pipeline
  • Optional error handling via onError callback
  • Useful for non-critical effects like optional logging
import { pipe, tapSafe } from '@deessejs/fp';

const result = pipe(
  user,
  tapSafe(
    user => optionalAnalytics.track(user.id), // Might fail, but we don't care
    err => console.warn('Analytics failed:', err) // Handle if needed
  ),
  user => user.name
);
// Even if analytics fails, result is still computed

Sleep

Why Deliberate Delays Matter

Sometimes you need to wait - for rate limiting, debouncing, or honoring API quotas:

// Without sleep - all requests fire immediately
const results = await Promise.all(
  users.map(user => api.updateUser(user)) // Might hit rate limit
);

// With sleep - respects API limits
const results = [];
for (const user of users) {
  results.push(await api.updateUser(user));
  await sleep(100); // Wait 100ms between requests
}

sleep(ms, options?)

Creates a Promise that resolves after a specified delay.

Why use sleep:

  • Rate limiting for API calls
  • Debouncing user input
  • Retry backoff without custom implementation
  • Any scenario requiring a deliberate pause
import { sleep } from '@deessejs/fp';

// Simple delay
await sleep(1000); // Wait 1 second

// With jitter - adds randomness to prevent thundering herd
await sleep(1000, { jitter: true }); // 500-1500ms

sleepWithSignal(ms, options?)

Sleep that can be cancelled via AbortSignal.

Why use sleepWithSignal:

  • Cancel long sleeps when user navigates away
  • Combine with request cancellation
  • Implement request timeouts
import { sleepWithSignal } from '@deessejs/fp';

const controller = new AbortController();

// Start a long operation that can be cancelled
const longTask = async () => {
  await sleepWithSignal(60000, { signal: controller.signal });
  return doWork();
};

// User clicks cancel
cancelButton.onclick = () => controller.abort();

withTimeout(promise, ms, options?)

Adds a timeout to any Promise. Throws TimeoutError if the operation takes too long.

Why use withTimeout:

  • Prevent hanging requests
  • Enforce SLAs on operations
  • Better UX - show timeout messages instead of spinners
import { withTimeout } from '@deessejs/fp';

try {
  const data = await withTimeout(fetchData(), 5000, {
    message: 'Request took too long'
  });
} catch (error) {
  if (error.name === 'TIMEOUT') {
    showTimeoutMessage();
  }
}

TimeoutError structure:

interface TimeoutError extends Error {
  name: 'TIMEOUT';
  timeout: number;    // Configured timeout in ms
  elapsed?: number;   // Actual time before timeout
}

Signal Injection Mode

For fine-grained control over abortable operations, pass a function that receives an AbortSignal:

import { withTimeout } from '@deessejs/fp';

const { promise, cleanup } = withTimeout(
  (signal) => fetch('/api/data', { signal }),
  5000
);

try {
  const data = await promise;
} finally {
  cleanup(); // Call cleanup to abort and release resources
}

This is better than wrapping the promise directly because the signal is injected into the fetch call itself, allowing the HTTP request to be cancelled properly.

addJitter(delay, jitter?)

Adds randomness to delays to prevent thundering herd problems.

Why use jitter:

  • In distributed systems, many clients might retry at the same time
  • Jitter spreads out retries over time
  • Reduces load spikes on recovering services
import { addJitter } from '@deessejs/fp';

// Full jitter: 50% to 150% of delay (recommended for most cases)
addJitter(1000, true);  // 500-1500ms

// Partial jitter: 80% to 120% with 0.2 variance
addJitter(1000, 0.2);   // 800-1200ms

Retry

Why Retry Matters

Network requests and external services fail transiently. Instead of failing immediately, retrying with backoff handles temporary issues gracefully:

// Without retry - fails on first transient error
const data = await fetchFromFlakyService(); // Might fail

// With retry - automatically retries with backoff
const data = await retryAsync(
  () => fetchFromFlakyService(),
  { attempts: 3, delay: 1000, backoff: 'exponential' }
);

retry(fn, options?)

Retries a synchronous function on failure. Uses busy-wait for delays.

Warning: This blocks the event loop during retries. Use retryAsync instead for any async operations or in environments with UI.

retryAsync(fn, options?)

Retries an async function on failure. Use this in production.

Why use retryAsync:

  • Handle transient network failures gracefully
  • Implement circuit breakers patterns
  • Give external services time to recover
import { retryAsync } from '@deessejs/fp';

const data = await retryAsync(
  async () => {
    const res = await fetch('/api/flaky-endpoint');
    if (!res.ok) throw new Error(`HTTP ${res.status}`);
    return res.json();
  },
  {
    attempts: 3,
    delay: 1000,
    backoff: 'exponential' // 1s, 2s, 4s...
  }
);

Retry Options

interface RetryOptions {
  /** Number of attempts before giving up (default: 3) */
  attempts?: number;

  /** Initial delay between retries in ms (default: 1000) */
  delay?: number;

  /** How to increase delay between attempts */
  backoff?: 'exponential' | 'linear' | 'constant' | ((attempt: number, delay: number) => number);

  /** Maximum delay cap */
  maxDelay?: number;

  /** Only retry if predicate returns true for the error */
  predicate?: (error: Error) => boolean;

  /** Called before each retry attempt */
  onRetry?: (error: Error, attempt: number) => void;

  /** Add randomness to delay */
  jitter?: boolean;

  /** AbortSignal to cancel retries */
  signal?: AbortSignal;
}

Backoff Strategies

Exponential (default): Delay doubles each attempt

await retryAsync(fn, { delay: 1000, backoff: 'exponential' });
// Attempts at: 0ms, 1000ms, 2000ms, 4000ms...

Linear: Delay increases linearly

await retryAsync(fn, { delay: 1000, backoff: 'linear' });
// Attempts at: 0ms, 1000ms, 2000ms, 3000ms...

Constant: Same delay between all attempts

await retryAsync(fn, { delay: 1000, backoff: 'constant' });
// Attempts at: 0ms, 1000ms, 2000ms, 3000ms...

Custom function: Full control over delay calculation

await retryAsync(fn, {
  delay: 1000,
  backoff: (attempt, delay) => {
    const jitter = Math.random() * 100;
    return delay * Math.pow(2, attempt - 1) + jitter;
  }
});

Standalone Backoff Functions

Instead of using the string options, you can import and use the standalone backoff functions directly. This is useful when you want to reuse the same strategy across multiple retries or compose them with other delay modifiers:

import { retryAsync, exponentialBackoff, linearBackoff, constantBackoff } from '@deessejs/fp';

// Exponential backoff: 1s, 2s, 4s, 8s...
retryAsync(fn, { delay: 1000, backoff: exponentialBackoff });

// Linear backoff: 1s, 2s, 3s, 4s...
retryAsync(fn, { delay: 1000, backoff: linearBackoff });

// Constant backoff: 1s, 1s, 1s, 1s...
retryAsync(fn, { delay: 1000, backoff: constantBackoff });

These functions match the behavior of their string equivalents but can be composed or modified:

import { retryAsync, exponentialBackoff } from '@deessejs/fp';

// Combine with jitter using a custom backoff
retryAsync(fn, {
  delay: 1000,
  backoff: (attempt, delay) => {
    const jitter = Math.random() * 200;
    return exponentialBackoff(attempt, delay) + jitter;
  }
});

Selective Retry with Predicate

Not all errors should trigger a retry:

await retryAsync(
  async () => {
    const result = await riskyOperation();
    if (result.validationFailed) throw new Error('Invalid input');
    return result;
  },
  {
    // Only retry on network errors, not business logic errors
    predicate: (error) => error.message.includes('network')
  }
);

Monitoring Retries

Use onRetry to log or track retry attempts:

import { retryAsync } from '@deessejs/fp';

await retryAsync(
  async () => flakyOperation(),
  {
    attempts: 5,
    onRetry: (error, attempt) => {
      console.log(`Attempt ${attempt} failed: ${error.message}`);
      metrics.increment('retry.count');
    }
  }
);

Cancellation

Use AbortSignal to cancel retries:

import { retryAsync } from '@deessejs/fp';

const controller = new AbortController();

try {
  await retryAsync(
    async () => flakyOperation(),
    { signal: controller.signal }
  );
} catch (error) {
  if (error.name === 'RetryAbortedError') {
    console.log('Retry was cancelled');
  }
}

// Cancel on user navigation, component unmount, etc.
window.addEventListener('beforeunload', () => controller.abort());

Real-World Examples

API Client with Retry and Timeout

import { retryAsync, withTimeout } from '@deessejs/fp';
import { pipeAsync } from '@deessejs/fp';

const createApiClient = (baseUrl: string) => {
  const request = <T>(path: string) =>
    pipeAsync(
      withTimeout(
        (signal) => fetch(`${baseUrl}${path}`, { signal }),
        5000,
        { name: 'ApiTimeout' }
      ),
      async (res: Response) => {
        if (!res.ok) throw new Error(`HTTP ${res.status}`);
        return res.json() as Promise<T>;
      }
    );

  return {
    getUser: (id: string) =>
      retryAsync(() => request<User>(`/users/${id}`), {
        attempts: 3,
        delay: 500,
        backoff: 'exponential',
        predicate: (error) => error.message.includes('network')
      }),

    getPosts: (userId: string) =>
      retryAsync(() => request<Post[]>(`/users/${userId}/posts`), {
        attempts: 3,
        delay: 500
      })
  };
};

Data Processing Pipeline with Progress

import { pipe, flow, tap } from '@deessejs/fp';

const processUserBatch = flow(
  (batch: RawUser[]) => batch.map(parseUser),
  users => users.filter(isValidUser),
  users => users.map(enrichUser),
  tap(users => saveToDatabase(users)) // Side effect without breaking flow
);

Rate-Limited API Calls

import { sleep } from '@deessejs/fp';

const fetchWithRateLimit = async <T>(
  urls: string[],
  options: { delay: number; concurrency: number }
): Promise<T[]> => {
  const results: T[] = [];

  for (let i = 0; i < urls.length; i++) {
    results.push(await fetch(urls[i]));

    // Don't delay after the last item
    if (i < urls.length - 1) {
      await sleep(options.delay);
    }
  }

  return results;
};

See Also

  • Result - For error handling in sync operations
  • AsyncResult - For error handling in async operations
  • Maybe - For optional values
  • Yield - For yielding control to the event loop

On this page