Skip to content

Introduction

In stream processing, a Sink is a construct designed to consume elements generated by a Stream.

┌─── Type of the result produced by the Sink
| ┌─── Type of elements consumed by the Sink
| | ┌─── Type of any leftover elements
│ | | ┌─── Type of possible errors
│ │ | | ┌─── Type of required dependencies
▼ ▼ ▼ ▼ ▼
Sink<A, In, L, E, R>

Here’s an overview of what a Sink does:

  • It consumes a varying number of In elements, which may include zero, one, or multiple elements.
  • It can encounter errors of type E during processing.
  • It produces a result of type A once processing completes.
  • It can also return a remainder of type L, representing any leftover elements.

To process a stream using a Sink, you can pass it directly to the Stream.run function:

Example (Using a Sink to Collect Stream Elements)

1
import {
import Stream
Stream
,
import Sink
Sink
,
import Effect
Effect
} from "effect"
2
3
// ┌─── Stream<number, never, never>
4
// ▼
5
const
const stream: Stream.Stream<number, never, never>
stream
=
import Stream
Stream
.
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3)
6
7
// Create a sink to take the first 2 elements of the stream
8
//
9
// ┌─── Sink<Chunk<number>, number, number, never, never>
10
// ▼
11
const
const sink: Sink.Sink<Chunk<number>, number, number, never, never>
sink
=
import Sink
Sink
.
const take: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>

A sink that takes the specified number of values.

take
<number>(2)
12
13
// Run the stream through the sink to collect the elements
14
//
15
// ┌─── Effect<number, never, never>
16
// ▼
17
const
const sum: Effect.Effect<Chunk<number>, never, never>
sum
=
import Stream
Stream
.
const run: <number, never, never, Chunk<number>, never, never>(self: Stream.Stream<number, never, never>, sink: Sink.Sink<Chunk<number>, number, unknown, never, never>) => Effect.Effect<...> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
const stream: Stream.Stream<number, never, never>
stream
,
const sink: Sink.Sink<Chunk<number>, number, number, never, never>
sink
)
18
19
import Effect
Effect
.
const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<Chunk<number>>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
const sum: Effect.Effect<Chunk<number>, never, never>
sum
).
(method) Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
20
/*
21
Output:
22
{ _id: 'Chunk', values: [ 1, 2 ] }
23
*/

The type of sink is as follows:

┌─── result
| ┌─── consumed elements
| | ┌─── leftover elements
│ | | ┌─── no errors
│ │ | | ┌─── no dependencies
▼ ▼ ▼ ▼ ▼
Sink<Chunk<number>, number, number, never, never>

Here’s the breakdown:

  • Chunk<number>: The final result produced by the sink after processing elements (in this case, a Chunk of numbers).
  • number (first occurrence): The type of elements that the sink will consume from the stream.
  • number (second occurrence): The type of leftover elements, if any, that are not consumed.
  • never (first occurrence): Indicates that this sink does not produce any errors.
  • never (second occurrence): Shows that no dependencies are required to operate this sink.