In this guide, we’ll explore some essential operations you can perform on streams. These operations allow you to manipulate and interact with stream elements in various ways.

The Stream.tap operation allows you to run an effect on each element emitted by the stream, observing or performing side effects without altering the elements or return type. This can be useful for logging, monitoring, or triggering additional actions with each emission.

Example (Logging with Stream.tap)

For example, Stream.tap can be used to log each element before and after a mapping operation:

import {
import Stream
import Console
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }


(1, 2, 3).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>, Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<...>) => Stream.Stream<...>, bc: (_: Stream.Stream<...>) => Stream.Stream<...>, cd: (_: Stream.Stream<...>) => Stream.Stream<...>): Stream.Stream<...> (+21 overloads)
import Stream
const tap: <number, void, never, never>(f: (a: number) => Effect.Effect<void, never, never>) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+1 overload)

Adds an effect to consumption of every element of the stream.


import { Console, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`before mapping: ${n}`)), => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// before mapping: 1
// after mapping: 2
// before mapping: 2
// after mapping: 4
// before mapping: 3
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }


n: number
) =>
import Console
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>


(`before mapping: ${
n: number
import Stream
const map: <number, number>(f: (a: number) => number) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+1 overload)

Transforms the elements of this stream using the supplied function.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe( => n + 1))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 3, 4 ] }


n: number
) =>
n: number
* 2),
import Stream
const tap: <number, void, never, never>(f: (a: number) => Effect.Effect<void, never, never>) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+1 overload)

Adds an effect to consumption of every element of the stream.


import { Console, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`before mapping: ${n}`)), => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// before mapping: 1
// after mapping: 2
// before mapping: 2
// after mapping: 4
// before mapping: 3
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }


n: number
) =>
import Console
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>


(`after mapping: ${
n: number
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

before mapping: 1
after mapping: 2
before mapping: 2
after mapping: 4
before mapping: 3
after mapping: 6
{ _id: 'Chunk', values: [ 2, 4, 6 ] }

The “taking” operations in streams let you extract a specific set of elements, either by a fixed number, condition, or position within the stream. Here are a few ways to apply these operations:

takeExtracts a fixed number of elements.
takeWhileExtracts elements while a certain condition is met.
takeUntilExtracts elements until a certain condition is met.
takeRightExtracts a specified number of elements from the end.

Example (Extracting Elements in Different Ways)

import {
import Stream
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const iterate: <number>(value: number, next: (value: number) => number) => Stream.Stream<number, never, never>

The infinite stream of iterative function application: a, f(a), f(f(a)), f(f(f(a))), ...


import { Effect, Stream } from "effect"
// An infinite Stream of numbers starting from 1 and incrementing
const stream = Stream.iterate(1, (n) => n + 1)
// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(10)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }


(0, (
n: number
) =>
n: number
+ 1)
// Using `take` to extract a fixed number of elements:
const s1: Stream.Stream<number, never, never>
import Stream
const take: <number, never, never>(self: Stream.Stream<number, never, never>, n: number) => Stream.Stream<number, never, never> (+1 overload)

Takes the specified number of elements from this stream.


import { Effect, Stream } from "effect"
const stream = Stream.take(Stream.iterate(0, (n) => n + 1), 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }


const stream: Stream.Stream<number, never, never>
, 5)
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const s1: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
// Using `takeWhile` to extract elements while a condition is met:
const s2: Stream.Stream<number, never, never>
import Stream
const takeWhile: <number, never, never>(self: Stream.Stream<number, never, never>, predicate: Predicate<number>) => Stream.Stream<number, never, never> (+3 overloads)

Takes all elements of the stream for as long as the specified predicate evaluates to true.


import { Effect, Stream } from "effect"
const stream = Stream.takeWhile(Stream.iterate(0, (n) => n + 1), (n) => n < 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }


const stream: Stream.Stream<number, never, never>
, (
n: number
) =>
n: number
< 5)
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const s2: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
// Using `takeUntil` to extract elements until a condition is met:
const s3: Stream.Stream<number, never, never>
import Stream
const takeUntil: <number, never, never>(self: Stream.Stream<number, never, never>, predicate: Predicate<number>) => Stream.Stream<number, never, never> (+1 overload)

Takes all elements of the stream until the specified predicate evaluates to true.


import { Effect, Stream } from "effect"
const stream = Stream.takeUntil(Stream.iterate(0, (n) => n + 1), (n) => n === 4)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }


const stream: Stream.Stream<number, never, never>
, (
n: number
) =>
n: number
=== 5)
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const s3: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5 ] }
// Using `takeRight` to take elements from the end of the stream:
const s4: Stream.Stream<number, never, never>
import Stream
const takeRight: <number, never, never>(self: Stream.Stream<number, never, never>, n: number) => Stream.Stream<number, never, never> (+1 overload)

Takes the last specified number of elements from this stream.


import { Effect, Stream } from "effect"
const stream = Stream.takeRight(Stream.make(1, 2, 3, 4, 5, 6), 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 4, 5, 6 ] }


const s3: Stream.Stream<number, never, never>
, 3)
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const s4: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 3, 4, 5 ] }

When working with asynchronous data sources, such as async iterables, you often need to consume data in a loop until a certain condition is met. Streams provide a similar approach and offer additional flexibility.

With async iterables, data is processed in a loop until a break or return statement is encountered. To replicate this behavior with Streams, consider these options:

takeUntilTakes elements from a stream until a specified condition is met, similar to breaking out of a loop.
toPullReturns an effect that continuously pulls data chunks from the stream. This effect can fail with None when the stream is finished or with Some error if it fails.

Example (Using Stream.toPull)

import {
import Stream
import Effect




} from "effect"
// Simulate a chunked stream
const stream: Stream.Stream<number, never, never>
import Stream
const fromIterable: <number>(iterable: Iterable<number>) => Stream.Stream<number, never, never>

Creates a new Stream from an iterable collection of values.


import { Effect, Stream } from "effect"
const numbers = [1, 2, 3]
const stream = Stream.fromIterable(numbers)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }


([1, 2, 3, 4, 5]).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<number, never, never>): Stream.Stream<...> (+21 overloads)
import Stream
const rechunk: (n: number) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E, R> (+1 overload)

Re-chunks the elements of the stream into chunks of n elements each. The last chunk might contain less than n elements.


const program: Effect.Effect<never, Option<never>, Scope>
import Effect




const gen: <YieldWrap<Effect.Effect<Effect.Effect<Chunk<number>, Option<never>, never>, never, Scope>> | YieldWrap<Effect.Effect<Chunk<number>, Option<...>, never>>, never>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.


import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`


(function* () {
// Create an effect to get data chunks from the stream
const getChunk: Effect.Effect<Chunk<number>, Option<never>, never>
= yield*
import Stream
const toPull: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Effect.Effect<Chunk<number>, Option<never>, never>, never, Scope>

Returns in a scope a ZIO effect that can be used to repeatedly pull chunks from the stream. The pull effect fails with None when the stream is finished, or with Some error if it fails, otherwise it returns a chunk of the stream's output.


import { Effect, Stream } from "effect"
// Simulate a chunked stream
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(Stream.rechunk(2))
const program = Effect.gen(function*() {
// Create an effect to get data chunks from the stream
const getChunk = yield* Stream.toPull(stream)
// Continuously fetch and process chunks
while (true) {
const chunk = yield* getChunk
// Effect.runPromise(Effect.scoped(program)).then(console.log, console.error)
// { _id: 'Chunk', values: [ 1, 2 ] }
// { _id: 'Chunk', values: [ 3, 4 ] }
// { _id: 'Chunk', values: [ 5 ] }
// (FiberFailure) Error: {
// "_id": "Option",
// "_tag": "None"
// }


const stream: Stream.Stream<number, never, never>
// Continuously fetch and process chunks
while (true) {
const chunk: Chunk<number>
= yield*
const getChunk: Effect.Effect<Chunk<number>, Option<never>, never>
const chunk: Chunk<number>
import Effect




const runPromise: <never, Option<never>>(effect: Effect.Effect<never, Option<never>, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<never>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Effect




const scoped: <never, Option<never>, Scope>(effect: Effect.Effect<never, Option<never>, Scope>) => Effect.Effect<never, Option<never>, never>

Scopes all resources used in an effect to the lifetime of the effect.


This function ensures that all resources used within an effect are tied to its lifetime. Finalizers for these resources are executed automatically when the effect completes, whether through success, failure, or interruption. This guarantees proper resource cleanup without requiring explicit management.


const program: Effect.Effect<never, Option<never>, Scope>
Promise<never>.then<void, void>(onfulfilled?: ((value: never) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => void | PromiseLike<void>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 1, 2 ] }
{ _id: 'Chunk', values: [ 3, 4 ] }
{ _id: 'Chunk', values: [ 5 ] }
(FiberFailure) Error: {
"_id": "Option",
"_tag": "None"

The operation applies a specified function to each element in a stream, creating a new stream with the transformed values.

Example (Incrementing Each Element by 1)

import {
import Stream
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }


(1, 2, 3).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<number, never, never>): Stream.Stream<...> (+21 overloads)
import Stream
const map: <number, number>(f: (a: number) => number) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+1 overload)

Transforms the elements of this stream using the supplied function.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe( => n + 1))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 3, 4 ] }


n: number
) =>
n: number
+ 1) // Increment each element by 1
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 2, 3, 4 ] }

The method allows you to replace each success value in a stream with a specified constant value. This can be useful when you want all elements in the stream to emit a uniform value, regardless of the original data.

Example (Mapping to null)

import {
import Stream
import Effect




} from "effect"
const stream: Stream.Stream<null, never, never>
import Stream
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.


import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }


(1, 5).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<null, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<null, never, never>): Stream.Stream<...> (+21 overloads)
import Stream
const as: <null>(value: null) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<null, E, R> (+1 overload)

Maps the success values of this stream to the specified constant value.


import { Effect, Stream } from "effect"
const stream = Stream.range(1, 5).pipe(
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ null, null, null, null, null ] }


import Effect




const runPromise: <Chunk<null>, never>(effect: Effect.Effect<Chunk<null>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<null>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <null, never, never>(self: Stream.Stream<null, never, never>) => Effect.Effect<Chunk<null>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<null, never, never>
Promise<Chunk<null>>.then<void, never>(onfulfilled?: ((value: Chunk<null>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ null, null, null, null, null ] }

For transformations involving effects, use Stream.mapEffect. This function applies an effectful operation to each element in the stream, producing a new stream with effectful results.

Example (Random Number Generation)

import {
import Stream
import Random
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }


(10, 20, 30).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<number, never, never>): Stream.Stream<...> (+21 overloads)
// Generate a random number between 0 and each element
import Stream
const mapEffect: <number, number, never, never>(f: (a: number) => Effect.Effect<number, never, never>, options?: {
readonly concurrency?: number | "unbounded" | undefined;
readonly unordered?: boolean | undefined;
} | undefined) => <E, R>(self: Stream.Stream<...>) => Stream.Stream<...> (+3 overloads)

Maps over elements of the stream with the specified effectful function.


import { Effect, Random, Stream } from "effect"
const stream = Stream.make(10, 20, 30).pipe(
Stream.mapEffect((n) => Random.nextIntBetween(0, n))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 7, 19, 8 ] }


n: number
) =>
import Random
const nextIntBetween: (min: number, max: number) => Effect.Effect<number>

Returns the next integer value in the specified range from the pseudo-random number generator.


n: number
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

Example Output:
{ _id: 'Chunk', values: [ 5, 9, 22 ] }

To handle multiple effectful transformations concurrently, you can use the concurrency option. This option allows a specified number of effects to run concurrently, with results emitted downstream in their original order.

Example (Fetching URLs Concurrently)

import {
import Stream
import Effect




} from "effect"
const fetchUrl: (url: string) => Effect.Effect<string[], never, never>
= (
url: string
: string) =>
import Effect




const gen: <YieldWrap<Effect.Effect<void, never, never>>, string[]>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<void, never, never>>, string[], never>) => Effect.Effect<...> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.


import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`


(function* () {
(`Fetching ${
url: string
import Effect




const sleep: (duration: DurationInput) => Effect.Effect<void>

Suspends the execution of an effect for a specified Duration.


This function pauses the execution of an effect for a given duration. It is asynchronous, meaning that it does not block the fiber executing the effect. Instead, the fiber is suspended during the delay period and can resume once the specified time has passed.

The duration can be specified using various formats supported by the Duration module, such as a string ("2 seconds") or numeric value representing milliseconds.


import { Effect } from "effect"
const program = Effect.gen(function*() {
console.log("Starting task...")
yield* Effect.sleep("3 seconds") // Waits for 3 seconds
console.log("Task completed!")
// Effect.runFork(program)
// Output:
// Starting task...
// Task completed!


("100 millis")
(`Fetching ${
url: string
} done`)
return [`Resource 0-${
url: string
}`, `Resource 1-${
url: string
}`, `Resource 2-${
url: string
const stream: Stream.Stream<string[], never, never>
import Stream
const make: <[string, string, string]>(as_0: string, as_1: string, as_2: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }


("url1", "url2", "url3").
Pipeable.pipe<Stream.Stream<string, never, never>, Stream.Stream<string[], never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Stream.Stream<string[], never, never>): Stream.Stream<...> (+21 overloads)
// Fetch each URL concurrently with a limit of 2
import Stream
const mapEffect: <string, string[], never, never>(f: (a: string) => Effect.Effect<string[], never, never>, options?: {
readonly concurrency?: number | "unbounded" | undefined;
readonly unordered?: boolean | undefined;
} | undefined) => <E, R>(self: Stream.Stream<...>) => Stream.Stream<...> (+3 overloads)

Maps over elements of the stream with the specified effectful function.


import { Effect, Random, Stream } from "effect"
const stream = Stream.make(10, 20, 30).pipe(
Stream.mapEffect((n) => Random.nextIntBetween(0, n))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 7, 19, 8 ] }


const fetchUrl: (url: string) => Effect.Effect<string[], never, never>
, {
concurrency?: number | "unbounded" | undefined
: 2 })
import Effect




const runPromise: <Chunk<string[]>, never>(effect: Effect.Effect<Chunk<string[]>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<string[]>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <string[], never, never>(self: Stream.Stream<string[], never, never>) => Effect.Effect<Chunk<string[]>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<string[], never, never>
Promise<Chunk<string[]>>.then<void, never>(onfulfilled?: ((value: Chunk<string[]>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

Fetching url1
Fetching url2
Fetching url1 done
Fetching url3
Fetching url2 done
Fetching url3 done
_id: 'Chunk',
values: [
[ 'Resource 0-url1', 'Resource 1-url1', 'Resource 2-url1' ],
[ 'Resource 0-url2', 'Resource 1-url2', 'Resource 2-url2' ],
[ 'Resource 0-url3', 'Resource 1-url3', 'Resource 2-url3' ]

Stream.mapAccum is similar to, but it applies a transformation with state tracking, allowing you to map and accumulate values within a single operation. This is useful for tasks like calculating a running total in a stream.

Example (Calculating a Running Total)

import {
import Stream
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.


import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }


(1, 5).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<number, never, never>): Stream.Stream<...> (+21 overloads)
// ┌─── next state
// │ ┌─── emitted value
// ▼ ▼
import Stream
const mapAccum: <number, number, number>(s: number, f: (s: number, a: number) => readonly [number, number]) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+1 overload)

Statefully maps over the elements of this stream to produce new elements.


import { Effect, Stream } from "effect"
const runningTotal = (stream: Stream.Stream<number>): Stream.Stream<number> =>
stream.pipe(Stream.mapAccum(0, (s, a) => [s + a, s + a]))
// input: 0, 1, 2, 3, 4, 5, 6
// Effect.runPromise(Stream.runCollect(runningTotal(Stream.range(0, 6)))).then(
// console.log
// )
// { _id: "Chunk", values: [ 0, 1, 3, 6, 10, 15, 21 ] }


(0, (
state: number
n: number
) => [
state: number
n: number
state: number
n: number
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 1, 3, 6, 10, 15 ] }

The Stream.mapConcat operation is similar to, but it goes further by mapping each element to zero or more elements (as an Iterable) and then flattening the entire stream. This is particularly useful for transforming each element into multiple values.

Example (Splitting and Flattening a Stream)

import {
import Stream
import Effect




} from "effect"
const numbers: Stream.Stream<string, never, never>
import Stream
const make: <[string, string, string]>(as_0: string, as_1: string, as_2: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.


import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }


("1-2-3", "4-5", "6").
Pipeable.pipe<Stream.Stream<string, never, never>, Stream.Stream<string, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Stream.Stream<string, never, never>): Stream.Stream<...> (+21 overloads)
import Stream
const mapConcat: <string, string>(f: (a: string) => Iterable<string>) => <E, R>(self: Stream.Stream<string, E, R>) => Stream.Stream<string, E, R> (+1 overload)

Maps each element to an iterable, and flattens the iterables into the output of this stream.


import { Effect, Stream } from "effect"
const numbers = Stream.make("1-2-3", "4-5", "6").pipe(
Stream.mapConcat((s) => s.split("-")), => parseInt(s))
// Effect.runPromise(Stream.runCollect(numbers)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }


s: string
) =>
s: string
String.split(separator: string | RegExp, limit?: number): string[] (+1 overload)

Split a string into substrings using the specified separator and return them as an array.

@paramseparator A string that identifies character or characters to use in separating the string. If omitted, a single-element array containing the entire string is returned.

@paramlimit A value used to limit the number of elements returned in the array.

import Effect




const runPromise: <Chunk<string>, never>(effect: Effect.Effect<Chunk<string>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<string>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <string, never, never>(self: Stream.Stream<string, never, never>) => Effect.Effect<Chunk<string>, never, never>

Runs the stream and collects all of its elements to a chunk.


const numbers: Stream.Stream<string, never, never>
Promise<Chunk<string>>.then<void, never>(onfulfilled?: ((value: Chunk<string>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ '1', '2', '3', '4', '5', '6' ] }

The Stream.filter operation allows you to pass through only elements that meet a specific condition. It’s a way to retain elements in a stream that satisfy a particular criteria while discarding the rest.

Example (Filtering Even Numbers)

import {
import Stream
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.


import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }


(1, 11).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<number, never, never>): Stream.Stream<...> (+21 overloads)
import Stream
const filter: <number, number>(predicate: Predicate<number>) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+3 overloads)

Filters the elements emitted by this stream using the provided function.


import { Effect, Stream } from "effect"
const stream = Stream.range(1, 11).pipe(Stream.filter((n) => n % 2 === 0))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }


n: number
) =>
n: number
% 2 === 0))
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }

Stream scanning allows you to apply a function cumulatively to each element in the stream, emitting every intermediate result. Unlike reduce, which only provides a final result, scan offers a step-by-step view of the accumulation process.

Example (Cumulative Addition)

import {
import Stream
import Effect




} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.


import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }


(1, 5).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<number, never, never>): Stream.Stream<...> (+21 overloads)
import Stream
const scan: <number, number>(s: number, f: (s: number, a: number) => number) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<number, E, R> (+1 overload)

Statefully maps over the elements of this stream to produce all intermediate results of type S given an initial S.


import { Effect, Stream } from "effect"
const stream = Stream.range(1, 6).pipe(Stream.scan(0, (a, b) => a + b))
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15, 21 ] }


(0, (
a: number
b: number
) =>
a: number
b: number
import Effect




const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<Chunk<number>>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


import Stream
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.


const stream: Stream.Stream<number, never, never>
Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

{ _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }

If you need only the final accumulated value, you can use Stream.runFold:

Example (Final Accumulated Result)

import {
import Stream
import Effect




} from "effect"
const fold: Effect.Effect<number, never, never>
import Stream
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.


import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }


(1, 5).
Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<number, never, never>): Effect.Effect<...> (+21 overloads)
import Stream
const runFold: <number, number>(s: number, f: (s: number, a: number) => number) => <E, R>(self: Stream.Stream<number, E, R>) => Effect.Effect<number, E, R> (+1 overload)

Executes a pure fold over the stream of values - reduces all elements in the stream to a value of type S.


(0, (
a: number
b: number
) =>
a: number
b: number
import Effect




const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<number>

Executes an effect and returns the result as a Promise.


This function runs an effect and converts its result into a Promise. If the effect succeeds, the Promise will resolve with the successful result. If the effect fails, the Promise will reject with an error, which includes the failure details of the effect.

The optional options parameter allows you to pass an AbortSignal for cancellation, enabling more fine-grained control over asynchronous tasks.

When to Use

Use this function when you need to execute an effect and work with its result in a promise-based system, such as when integrating with third-party libraries that expect Promise results.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.


// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1


//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

// Effect.runPromise("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error


const fold: Effect.Effect<number, never, never>
Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

) // Output: 15

Stream draining lets you execute effectful operations within a stream while discarding the resulting values. This can be useful when you need to run actions or perform side effects but don’t require the emitted values. The Stream.drain function achieves this by ignoring all elements in the stream and producing an empty output stream.

Example (Executing Effectful Operations without Collecting Values)

import {
import Stream
import Effect




import Random
} from "effect"
const stream: Stream.Stream<number, never, never>
import Stream
const repeatEffect: <number, never, never>(effect: Effect.Effect<number, never, never>) => Stream.Stream<number, never, never>

Creates a stream from an effect producing a value of type A which repeats forever.


import { Effect, Random, Stream } from "effect"
const stream = Stream.repeatEffect(Random.nextInt)
// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 3891571149, 4239494205, 2352981603, 2339111046, 1488052210 ] }


import Effect




const gen: <YieldWrap<Effect.Effect<number, never, never>>, number>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<number, never, never>>, number, never>) => Effect.Effect<...> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.


import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`


(function* () {
const nextInt: number
= yield*