Skip to content

Sink Concurrency

This section covers concurrent operations that allow multiple sinks to run simultaneously. These can be valuable for enhancing task performance when concurrent execution is desired.

To run two sinks concurrently and combine their results, use Sink.zip. This operation executes both sinks concurrently and combines their outcomes into a tuple.

Example (Running Two Sinks Concurrently and Combining Results)

import {
import Sink
Sink
,
import Console
Console
,
import Stream
Stream
,
import Schedule
Schedule
,
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
} from "effect"
const
const stream: Stream.Stream<string, never, never>
stream
=
import Stream
Stream
.
const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.

@example

import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

@since2.0.0

make
("1", "2", "3", "4", "5").
Pipeable.pipe<Stream.Stream<string, never, never>, Stream.Stream<string, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Stream.Stream<string, never, never>): Stream.Stream<...> (+21 overloads)
pipe
(
import Stream
Stream
.
const schedule: <number, string, never, string>(schedule: Schedule.Schedule<number, string, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Stream.Stream<string, E, R> (+1 overload)

Schedules the output of the stream using the provided schedule.

@since2.0.0

schedule
(
import Schedule
Schedule
.
const spaced: (duration: DurationInput) => Schedule.Schedule<number>

Returns a schedule that recurs continuously, each repetition spaced the specified duration from the last run.

@since2.0.0

spaced
("10 millis"))
)
const
const sink1: Sink.Sink<number, string, never, never, never>
sink1
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

@since2.0.0

forEach
((
s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>

@since2.0.0

log
(`sink 1: ${
s: string
s
}`)
).
Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

@since2.0.0

as
(1))
const
const sink2: Sink.Sink<number, string, never, never, never>
sink2
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

@since2.0.0

forEach
((
s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>

@since2.0.0

log
(`sink 2: ${
s: string
s
}`)
).
Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

@since2.0.0

as
(2))
// Combine the two sinks to run concurrently and collect results in a tuple
const
const sink: Sink.Sink<[number, number], string, never, never, never>
sink
=
import Sink
Sink
.
const zip: <number, string, never, never, never, number, string, never, never, never>(self: Sink.Sink<number, string, never, never, never>, that: Sink.Sink<number, string, never, never, never>, options?: {
readonly concurrent?: boolean | undefined;
} | undefined) => Sink.Sink<...> (+1 overload)

Feeds inputs to this sink until it yields a result, then switches over to the provided sink until it yields a result, finally combining the two results into a tuple.

@since2.0.0

zip
(
const sink1: Sink.Sink<number, string, never, never, never>
sink1
,
const sink2: Sink.Sink<number, string, never, never, never>
sink2
, {
concurrent?: boolean | undefined
concurrent
: true })
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runPromise: <[number, number], never>(effect: Effect.Effect<[number, number], never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<[number, number]>

Executes an effect and returns the result as a Promise.

When to Use

Use runPromise when you need to execute an effect and work with the result using Promise syntax, typically for compatibility with other promise-based code.

If the effect succeeds, the promise will resolve with the result. If the effect fails, the promise will reject with an error.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.

@example

// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1

@example

//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

Effect.runPromise(Effect.fail("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error

@since2.0.0

runPromise
(
import Stream
Stream
.
const run: <string, never, never, [number, number], never, never>(self: Stream.Stream<string, never, never>, sink: Sink.Sink<[number, number], string, unknown, never, never>) => Effect.Effect<[...], never, never> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

@since2.0.0

run
(
const stream: Stream.Stream<string, never, never>
stream
,
const sink: Sink.Sink<[number, number], string, never, never, never>
sink
)).
Promise<[number, number]>.then<void, never>(onfulfilled?: ((value: [number, number]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

then
(
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
)
/*
Output:
sink 1: 1
sink 2: 1
sink 1: 2
sink 2: 2
sink 1: 3
sink 2: 3
sink 1: 4
sink 2: 4
sink 1: 5
sink 2: 5
[ 1, 2 ]
*/

The Sink.race operation allows multiple sinks to compete for completion. The first sink to finish provides the result.

Example (Racing Two Sinks to Capture the First Result)

import {
import Sink
Sink
,
import Console
Console
,
import Stream
Stream
,
import Schedule
Schedule
,
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
} from "effect"
const
const stream: Stream.Stream<string, never, never>
stream
=
import Stream
Stream
.
const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.

@example

import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

@since2.0.0

make
("1", "2", "3", "4", "5").
Pipeable.pipe<Stream.Stream<string, never, never>, Stream.Stream<string, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Stream.Stream<string, never, never>): Stream.Stream<...> (+21 overloads)
pipe
(
import Stream
Stream
.
const schedule: <number, string, never, string>(schedule: Schedule.Schedule<number, string, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Stream.Stream<string, E, R> (+1 overload)

Schedules the output of the stream using the provided schedule.

@since2.0.0

schedule
(
import Schedule
Schedule
.
const spaced: (duration: DurationInput) => Schedule.Schedule<number>

Returns a schedule that recurs continuously, each repetition spaced the specified duration from the last run.

@since2.0.0

spaced
("10 millis"))
)
const
const sink1: Sink.Sink<number, string, never, never, never>
sink1
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

@since2.0.0

forEach
((
s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>

@since2.0.0

log
(`sink 1: ${
s: string
s
}`)
).
Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

@since2.0.0

as
(1))
const
const sink2: Sink.Sink<number, string, never, never, never>
sink2
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

@since2.0.0

forEach
((
s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>

@since2.0.0

log
(`sink 2: ${
s: string
s
}`)
).
Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

@since2.0.0

as
(2))
// Race the two sinks, the result will be from the first to complete
const
const sink: Sink.Sink<number, string, never, never, never>
sink
=
import Sink
Sink
.
const race: <number, string, never, never, never, number, string, never, never, never>(self: Sink.Sink<number, string, never, never, never>, that: Sink.Sink<number, string, never, never, never>) => Sink.Sink<...> (+1 overload)

Runs both sinks in parallel on the input, , returning the result or the error from the one that finishes first.

@since2.0.0

race
(
const sink1: Sink.Sink<number, string, never, never, never>
sink1
,
const sink2: Sink.Sink<number, string, never, never, never>
sink2
)
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<number>

Executes an effect and returns the result as a Promise.

When to Use

Use runPromise when you need to execute an effect and work with the result using Promise syntax, typically for compatibility with other promise-based code.

If the effect succeeds, the promise will resolve with the result. If the effect fails, the promise will reject with an error.

@seerunPromiseExit for a version that returns an Exit type instead of rejecting.

@example

// Title: Running a Successful Effect as a Promise
import { Effect } from "effect"
Effect.runPromise(Effect.succeed(1)).then(console.log)
// Output: 1

@example

//Example: Handling a Failing Effect as a Rejected Promise import { Effect } from "effect"

Effect.runPromise(Effect.fail("my error")).catch(console.error) // Output: // (FiberFailure) Error: my error

@since2.0.0

runPromise
(
import Stream
Stream
.
const run: <string, never, never, number, never, never>(self: Stream.Stream<string, never, never>, sink: Sink.Sink<number, string, unknown, never, never>) => Effect.Effect<number, never, never> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

@since2.0.0

run
(
const stream: Stream.Stream<string, never, never>
stream
,
const sink: Sink.Sink<number, string, never, never, never>
sink
)).
Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

@paramonfulfilled The callback to execute when the Promise is resolved.

@paramonrejected The callback to execute when the Promise is rejected.

@returnsA Promise for the completion of which ever callback is executed.

then
(
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
)
/*
Output:
sink 1: 1
sink 2: 1
sink 1: 2
sink 2: 2
sink 1: 3
sink 2: 3
sink 1: 4
sink 2: 4
sink 1: 5
sink 2: 5
1
*/