Skip to content

Sink Operations

In previous sections, we learned how to create and use sinks. Now, let’s explore some operations that let you transform or filter sink behavior.

At times, you may have a sink that works with one type of input, but your current stream uses a different type. The Sink.mapInput function helps you adapt your sink to a new input type by transforming the input values. While Sink.map changes the sink’s output, Sink.mapInput changes the input it accepts.

Example (Converting String Input to Numeric for Summing)

Suppose you have a Sink.sum that calculates the sum of numbers. If your stream contains strings rather than numbers, Sink.mapInput can convert those strings into numbers, allowing Sink.sum to work with your stream:

import {
import Stream
Stream
,
import Sink
Sink
,
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
} from "effect"
// A stream of numeric strings
const
const stream: Stream.Stream<string, never, never>
stream
=
import Stream
Stream
.
const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.

@example

import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

@since2.0.0

make
("1", "2", "3", "4", "5")
// Define a sink for summing numeric values
const
const numericSum: Sink.Sink<number, number, never, never, never>
numericSum
=
import Sink
Sink
.
const sum: Sink.Sink<number, number, never, never, never>

A sink that sums incoming numeric values.

@since2.0.0

sum
// Use mapInput to adapt the sink, converting strings to numbers
const
const stringSum: Sink.Sink<number, string, never, never, never>
stringSum
=
const numericSum: Sink.Sink<number, number, never, never, never>
numericSum
.
Pipeable.pipe<Sink.Sink<number, number, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<number, number, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const mapInput: <string, number>(f: (input: string) => number) => <A, L, E, R>(self: Sink.Sink<A, number, L, E, R>) => Sink.Sink<A, string, L, E, R> (+1 overload)

Transforms this sink's input elements.

@since2.0.0

mapInput
((
s: string
s
: string) =>
var Number: NumberConstructor

An object that represents a number of any kind. All JavaScript numbers are 64-bit floating-point numbers.

Number
.
NumberConstructor.parseFloat(string: string): number

Converts a string to a floating-point number.

@paramstring A string that contains a floating-point number.

parseFloat
(
s: string
s
))
)
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<number>

Executes an effect and returns the result as a Promise.

When to Use

Use runPromise when you need to execute an effect and work with the result using Promise syntax, typically for compatibility with other promise-based code.

If the effect succeeds, the promise will resolve with the result. If the effect fails, the promise will reject with an error.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.

@example

// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1

@example

//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

Effect.runPromise(Effect.fail("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error

@since2.0.0

runPromise
(
import Stream
Stream
.
const run: <string, never, never, number, never, never>(self: Stream.Stream<string, never, never>, sink: Sink.Sink<number, string, unknown, never, never>) => Effect.Effect<number, never, never> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

@since2.0.0

run
(
const stream: Stream.Stream<string, never, never>
stream
,
const stringSum: Sink.Sink<number, string, never, never, never>
stringSum
)).
Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

then
(
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
)
// Output: 15

When you need to transform both the input and output of a sink, Sink.dimap provides a flexible solution. It extends mapInput by allowing you to transform the input type, perform the operation, and then transform the output to a new type. This can be useful for complete conversions between input and output types.

Example (Converting Input to Integer, Summing, and Converting Output to String)

import {
import Stream
Stream
,
import Sink
Sink
,
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
} from "effect"
// A stream of numeric strings
const
const stream: Stream.Stream<string, never, never>
stream
=
import Stream
Stream
.
const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.

@example

import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

@since2.0.0

make
("1", "2", "3", "4", "5")
// Convert string inputs to numbers, sum them,
// then convert the result to a string
const
const sumSink: Sink.Sink<string, string, never, never, never>
sumSink
=
import Sink
Sink
.
const dimap: <number, number, never, never, never, string, string>(self: Sink.Sink<number, number, never, never, never>, options: {
readonly onInput: (input: string) => number;
readonly onDone: (a: number) => string;
}) => Sink.Sink<...> (+1 overload)

Transforms both inputs and result of this sink using the provided functions.

@since2.0.0

dimap
(
import Sink
Sink
.
const sum: Sink.Sink<number, number, never, never, never>

A sink that sums incoming numeric values.

@since2.0.0

sum
, {
// Transform input: string to number
onInput: (input: string) => number
onInput
: (
s: string
s
: string) =>
var Number: NumberConstructor

An object that represents a number of any kind. All JavaScript numbers are 64-bit floating-point numbers.

Number
.
NumberConstructor.parseFloat(string: string): number

Converts a string to a floating-point number.

@paramstring A string that contains a floating-point number.

parseFloat
(
s: string
s
),
// Transform output: number to string
onDone: (a: number) => string
onDone
: (
n: number
n
) =>
var String: StringConstructor
(value?: any) => string

Allows manipulation and formatting of text strings and determination and location of substrings within strings.

String
(
n: number
n
)
})
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runPromise: <string, never>(effect: Effect.Effect<string, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<string>

Executes an effect and returns the result as a Promise.

When to Use

Use runPromise when you need to execute an effect and work with the result using Promise syntax, typically for compatibility with other promise-based code.

If the effect succeeds, the promise will resolve with the result. If the effect fails, the promise will reject with an error.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.

@example

// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1

@example

//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

Effect.runPromise(Effect.fail("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error

@since2.0.0

runPromise
(
import Stream
Stream
.
const run: <string, never, never, string, never, never>(self: Stream.Stream<string, never, never>, sink: Sink.Sink<string, string, unknown, never, never>) => Effect.Effect<string, never, never> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

@since2.0.0

run
(
const stream: Stream.Stream<string, never, never>
stream
,
const sumSink: Sink.Sink<string, string, never, never, never>
sumSink
)).
Promise<string>.then<void, never>(onfulfilled?: ((value: string) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

then
(
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
)
// Output: "15"

Sinks can also filter incoming elements based on specific conditions with Sink.filterInput. This operation allows the sink to process only elements that meet certain criteria.

Example (Filtering Negative Numbers in Chunks of Three)

In the example below, elements are collected in chunks of three, but only positive numbers are included:

import {
import Stream
Stream
,
import Sink
Sink
,
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
} from "effect"
// Define a stream with positive, negative, and zero values
const
const stream: Stream.Stream<Chunk<number>, never, never>
stream
=
import Stream
Stream
.
const fromIterable: <number>(iterable: Iterable<number>) => Stream.Stream<number, never, never>

Creates a new Stream from an iterable collection of values.

@example

import { Effect, Stream } from "effect"
const numbers = [1, 2, 3]
const stream = Stream.fromIterable(numbers)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

@since2.0.0

fromIterable
([
1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6
]).
Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<Chunk<number>, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<Chunk<number>, never, never>): Stream.Stream<...> (+21 overloads)
pipe
(
import Stream
Stream
.
const transduce: <Chunk<number>, number, never, never>(sink: Sink.Sink<Chunk<number>, number, number, never, never>) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<...> (+1 overload)

Applies the transducer to the stream and emits its outputs.

@since2.0.0

transduce
(
// Collect chunks of 3, filtering out non-positive numbers
import Sink
Sink
.
const collectAllN: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>

A sink that collects first n elements into a chunk.

@since2.0.0

collectAllN
<number>(3).
Pipeable.pipe<Sink.Sink<Chunk<number>, number, number, never, never>, Sink.Sink<Chunk<number>, number, number, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<Chunk<number>, number, number, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const filterInput: <number, number>(f: Predicate<number>) => <A, L, E, R>(self: Sink.Sink<A, number, L, E, R>) => Sink.Sink<A, number, L, E, R> (+1 overload)

Filters the sink's input with the given predicate.

@since2.0.0

filterInput
((
n: number
n
) =>
n: number
n
> 0))
)
)
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runPromise: <Chunk<Chunk<number>>, never>(effect: Effect.Effect<Chunk<Chunk<number>>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<...>

Executes an effect and returns the result as a Promise.

When to Use

Use runPromise when you need to execute an effect and work with the result using Promise syntax, typically for compatibility with other promise-based code.

If the effect succeeds, the promise will resolve with the result. If the effect fails, the promise will reject with an error.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.

@example

// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1

@example

//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

Effect.runPromise(Effect.fail("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error

@since2.0.0

runPromise
(
import Stream
Stream
.
const runCollect: <Chunk<number>, never, never>(self: Stream.Stream<Chunk<number>, never, never>) => Effect.Effect<Chunk<Chunk<number>>, never, never>

Runs the stream and collects all of its elements to a chunk.

@since2.0.0

runCollect
(
const stream: Stream.Stream<Chunk<number>, never, never>
stream
)).
Promise<Chunk<Chunk<number>>>.then<void, never>(onfulfilled?: ((value: Chunk<Chunk<number>>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

then
((
chunk: Chunk<Chunk<number>>
chunk
) =>
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
("%o",
chunk: Chunk<Chunk<number>>
chunk
)
)
/*
Output:
{
_id: 'Chunk',
values: [
{ _id: 'Chunk', values: [ 1, 1, 3, [length]: 3 ] },
{ _id: 'Chunk', values: [ 4, 2, 1, [length]: 3 ] },
{ _id: 'Chunk', values: [ 1, 1, 6, [length]: 3 ] },
{ _id: 'Chunk', values: [ [length]: 0 ] },
[length]: 4
]
}
*/