Skip to content

Resourceful Streams

In the Stream module, you’ll find that most of the constructors offer a special variant designed for lifting a scoped resource into a Stream. When you use these specific constructors, you’re essentially creating streams that are inherently safe with regards to resource management. These constructors, before creating the stream, handle the resource acquisition, and after the stream’s usage, they ensure its proper closure.

Stream also provides us with Stream.acquireRelease and Stream.finalizer constructors that share similarities with Effect.acquireRelease and Effect.addFinalizer. These tools empower us to perform cleanup or finalization tasks before the stream concludes its operation.

In this section, we’ll explore an example that demonstrates the use of Stream.acquireRelease when working with file operations.

1
import {
import Stream
Stream
,
import Console
Console
,
import Effect
Effect
} from "effect"
2
3
// Simulating File operations
4
const
const open: (filename: string) => Effect.Effect<{ getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }, never, never>
open
= (
(parameter) filename: string
filename
: string) =>
5
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<void, never, never>>, { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
6
yield*
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`Opening ${
(parameter) filename: string
filename
}`)
7
return {
8
(property) getLines: Effect.Effect<string[], never, never>
getLines
:
import Effect
Effect
.
const succeed: <string[]>(value: string[]) => Effect.Effect<string[], never, never>

Creates an `Effect` that succeeds with the provided value. Use this function to represent a successful computation that yields a value of type `A`. The effect does not fail and does not require any environmental context.

succeed
(["Line 1", "Line 2", "Line 3"]),
9
(property) close: Effect.Effect<void, never, never>
close
:
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`Closing ${
(parameter) filename: string
filename
}`)
10
}
11
})
12
13
const
const stream: Stream.Stream<string[], never, never>
stream
=
import Stream
Stream
.
const acquireRelease: <{ getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }, never, never, never, void>(acquire: Effect.Effect<{ getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<...>; }, never, never>, release: (resource: { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<...>; }, exit: Exit<unknown, unknown>) => Effect.Effect<...>) => Stream.Stream<...>

Creates a stream from a single value that will get cleaned up after the stream is consumed.

acquireRelease
(
14
const open: (filename: string) => Effect.Effect<{ getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }, never, never>
open
("file.txt"),
15
(
(parameter) file: { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }
file
) =>
(parameter) file: { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }
file
.
(property) close: Effect.Effect<void, never, never>
close
16
).
(method) Pipeable.pipe<Stream.Stream<{ getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }, never, never>, Stream.Stream<string[], never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<...>) => Stream.Stream<...>): Stream.Stream<...> (+21 overloads)
pipe
(
import Stream
Stream
.
const flatMap: <{ getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }, string[], never, never>(f: (a: { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<...>; }) => Stream.Stream<...>, options?: { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined; readonly switch?: boolean | undefined; } | undefined) => <E, R>(self: Stream.Stream<...>) => Stream.Stream<...> (+1 overload)

Returns a stream made of the concatenation in strict order of all the streams produced by passing each element of this stream to `f0`

flatMap
((
(parameter) file: { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }
file
) =>
(parameter) file: { getLines: Effect.Effect<string[], never, never>; close: Effect.Effect<void, never, never>; }
file
.
(property) getLines: Effect.Effect<string[], never, never>
getLines
))
17
18
import Effect
Effect
.
const runPromise: <Chunk<string[]>, never>(effect: Effect.Effect<Chunk<string[]>, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<Chunk<string[]>>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
import Stream
Stream
.
const runCollect: <string[], never, never>(self: Stream.Stream<string[], never, never>) => Effect.Effect<Chunk<string[]>, never, never>

Runs the stream and collects all of its elements to a chunk.

runCollect
(
const stream: Stream.Stream<string[], never, never>
stream
)).
(method) Promise<Chunk<string[]>>.then<void, never>(onfulfilled?: ((value: Chunk<string[]>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
19
/*
20
Output:
21
Opening file.txt
22
Closing file.txt
23
{
24
_id: "Chunk",
25
values: [
26
[ "Line 1", "Line 2", "Line 3" ]
27
]
28
}
29
*/

In this code snippet, we’re simulating file operations using the open function. The Stream.acquireRelease function is employed to ensure that the file is correctly opened and closed, and we then process the lines of the file using the acquired resource.

In this section, we’ll explore the concept of finalization in streams. Finalization allows us to execute a specific action before a stream ends. This can be particularly useful when we want to perform cleanup tasks or add final touches to a stream.

Imagine a scenario where our streaming application needs to clean up a temporary directory when it completes its execution. We can achieve this using the Stream.finalizer function:

1
import {
import Stream
Stream
,
import Console
Console
,
import Effect
Effect
} from "effect"
2
3
const
const application: Stream.Stream<void, never, never>
application
=
import Stream
Stream
.
const fromEffect: <void, never, never>(effect: Effect.Effect<void, never, never>) => Stream.Stream<void, never, never>

Either emits the success value of this effect or terminates the stream with the failure value of this effect.

fromEffect
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
("Application Logic."))
4
5
const
const deleteDir: (dir: string) => Effect.Effect<void, never, never>
deleteDir
= (
(parameter) dir: string
dir
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`Deleting dir: ${
(parameter) dir: string
dir
}`)
6
7
const
const program: Stream.Stream<void, never, never>
program
=
const application: Stream.Stream<void, never, never>
application
.
(method) Pipeable.pipe<Stream.Stream<void, never, never>, Stream.Stream<void, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<void, never, never>) => Stream.Stream<void, never, never>): Stream.Stream<...> (+21 overloads)
pipe
(
8
import Stream
Stream
.
const concat: <void, never, never>(that: Stream.Stream<void, never, never>) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<void | A, E, R> (+1 overload)

Concatenates the specified stream with this stream, resulting in a stream that emits the elements from this stream and then the elements from the specified stream.

concat
(
9
import Stream
Stream
.
const finalizer: <never, void>(finalizer: Effect.Effect<void, never, never>) => Stream.Stream<void, never, never>

Creates a one-element stream that never fails and executes the finalizer when it ends.

finalizer
(
10
const deleteDir: (dir: string) => Effect.Effect<void, never, never>
deleteDir
("tmp").
(method) Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
11
import Effect
Effect
.
const andThen: <Effect.Effect<void, never, never>>(f: Effect.Effect<void, never, never>) => <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<void, E, R> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
("Temporary directory was deleted."))
12
)
13
)
14
)
15
)
16
17
import Effect
Effect
.
const runPromise: <Chunk<void>, never>(effect: Effect.Effect<Chunk<void>, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<Chunk<void>>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
import Stream
Stream
.
const runCollect: <void, never, never>(self: Stream.Stream<void, never, never>) => Effect.Effect<Chunk<void>, never, never>

Runs the stream and collects all of its elements to a chunk.

runCollect
(
const program: Stream.Stream<void, never, never>
program
)).
(method) Promise<Chunk<void>>.then<void, never>(onfulfilled?: ((value: Chunk<void>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
18
/*
19
Output:
20
Application Logic.
21
Deleting dir: tmp
22
Temporary directory was deleted.
23
{
24
_id: "Chunk",
25
values: [ undefined, undefined ]
26
}
27
*/

In this code example, we start with our application logic represented by the application stream. We then use Stream.finalizer to define a finalization step, which deletes a temporary directory and logs a message. This ensures that the temporary directory is cleaned up properly when the application completes its execution.

In this section, we’ll explore a scenario where we need to perform actions after the finalization of a stream. To achieve this, we can utilize the Stream.ensuring operator.

Consider a situation where our application has completed its primary logic and finalized some resources, but we also need to perform additional actions afterward. We can use Stream.ensuring for this purpose:

1
import {
import Stream
Stream
,
import Console
Console
,
import Effect
Effect
} from "effect"
2
3
const
const program: Stream.Stream<void, never, never>
program
=
import Stream
Stream
.
const fromEffect: <void, never, never>(effect: Effect.Effect<void, never, never>) => Stream.Stream<void, never, never>

Either emits the success value of this effect or terminates the stream with the failure value of this effect.

fromEffect
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
("Application Logic.")).
(method) Pipeable.pipe<Stream.Stream<void, never, never>, Stream.Stream<void, never, never>, Stream.Stream<void, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<void, never, never>) => Stream.Stream<...>, bc: (_: Stream.Stream<...>) => Stream.Stream<...>): Stream.Stream<...> (+21 overloads)
pipe
(
4
import Stream
Stream
.
const concat: <void, never, never>(that: Stream.Stream<void, never, never>) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<void | A, E, R> (+1 overload)

Concatenates the specified stream with this stream, resulting in a stream that emits the elements from this stream and then the elements from the specified stream.

concat
(
import Stream
Stream
.
const finalizer: <never, void>(finalizer: Effect.Effect<void, never, never>) => Stream.Stream<void, never, never>

Creates a one-element stream that never fails and executes the finalizer when it ends.

finalizer
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
("Finalizing the stream"))),
5
import Stream
Stream
.
const ensuring: <void, never>(finalizer: Effect.Effect<void, never, never>) => <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E, R> (+1 overload)

Executes the provided finalizer after this stream's finalizers run.

ensuring
(
6
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
("Doing some other works after stream's finalization")
7
)
8
)
9
10
import Effect
Effect
.
const runPromise: <Chunk<void>, never>(effect: Effect.Effect<Chunk<void>, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<Chunk<void>>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
import Stream
Stream
.
const runCollect: <void, never, never>(self: Stream.Stream<void, never, never>) => Effect.Effect<Chunk<void>, never, never>

Runs the stream and collects all of its elements to a chunk.

runCollect
(
const program: Stream.Stream<void, never, never>
program
)).
(method) Promise<Chunk<void>>.then<void, never>(onfulfilled?: ((value: Chunk<void>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
11
/*
12
Output:
13
Application Logic.
14
Finalizing the stream
15
Doing some other works after stream's finalization
16
{
17
_id: "Chunk",
18
values: [ undefined, undefined ]
19
}
20
*/

In this code example, we start with our application logic represented by the Application Logic. message. We then use Stream.finalizer to specify the finalization step, which logs Finalizing the stream. After that, we use Stream.ensuring to indicate that we want to perform additional tasks after the stream’s finalization, resulting in the message Performing additional tasks after stream's finalization. This ensures that our post-finalization actions are executed as expected.