Skip to content

Leftovers

In this section, we’ll look at handling elements left unconsumed by sinks. Sinks may process only a portion of the elements from an upstream source, leaving some elements as “leftovers.” Here’s how to collect or ignore these remaining elements.

If a sink doesn’t consume all elements from the upstream source, the remaining elements are called leftovers. To capture these leftovers, use Sink.collectLeftover, which returns a tuple containing the result of the sink operation and any unconsumed elements.

Example (Collecting Leftover Elements)

1
import {
import Stream
Stream
,
import Sink
Sink
,
import Effect
Effect
} from "effect"
2
3
const
const stream: Stream.Stream<number, never, never>
stream
=
import Stream
Stream
.
const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3, 4, 5)
4
5
// Take the first 3 elements and collect any leftovers
6
const
const sink1: Sink.Sink<[Chunk<number>, Chunk<number>], number, never, never, never>
sink1
=
import Sink
Sink
.
const take: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>

A sink that takes the specified number of values.

take
<number>(3).
(method) Pipeable.pipe<Sink.Sink<Chunk<number>, number, number, never, never>, Sink.Sink<[Chunk<number>, Chunk<number>], number, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<...>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const collectLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, Chunk<L>], In, never, E, R>

Collects the leftovers from the stream when the sink succeeds and returns them as part of the sink's result.

collectLeftover
)
7
8
import Effect
Effect
.
const runPromise: <[Chunk<number>, Chunk<number>], never>(effect: Effect.Effect<[Chunk<number>, Chunk<number>], never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<...>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
import Stream
Stream
.
const run: <number, never, never, [Chunk<number>, Chunk<number>], never, never>(self: Stream.Stream<number, never, never>, sink: Sink.Sink<[Chunk<number>, Chunk<number>], number, unknown, never, never>) => Effect.Effect<...> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
const stream: Stream.Stream<number, never, never>
stream
,
const sink1: Sink.Sink<[Chunk<number>, Chunk<number>], number, never, never, never>
sink1
)).
(method) Promise<[Chunk<number>, Chunk<number>]>.then<void, never>(onfulfilled?: ((value: [Chunk<number>, Chunk<number>]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
9
/*
10
Output:
11
[
12
{ _id: 'Chunk', values: [ 1, 2, 3 ] },
13
{ _id: 'Chunk', values: [ 4, 5 ] }
14
]
15
*/
16
17
// Take only the first element and collect the rest as leftovers
18
const
const sink2: Sink.Sink<[Option<number>, Chunk<number>], number, never, never, never>
sink2
=
import Sink
Sink
.
const head: <number>() => Sink.Sink<Option<number>, number, number, never, never>

Creates a sink containing the first value.

head
<number>().
(method) Pipeable.pipe<Sink.Sink<Option<number>, number, number, never, never>, Sink.Sink<[Option<number>, Chunk<number>], number, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<...>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const collectLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, Chunk<L>], In, never, E, R>

Collects the leftovers from the stream when the sink succeeds and returns them as part of the sink's result.

collectLeftover
)
19
20
import Effect
Effect
.
const runPromise: <[Option<number>, Chunk<number>], never>(effect: Effect.Effect<[Option<number>, Chunk<number>], never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<...>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
import Stream
Stream
.
const run: <number, never, never, [Option<number>, Chunk<number>], never, never>(self: Stream.Stream<number, never, never>, sink: Sink.Sink<[Option<number>, Chunk<number>], number, unknown, never, never>) => Effect.Effect<...> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
const stream: Stream.Stream<number, never, never>
stream
,
const sink2: Sink.Sink<[Option<number>, Chunk<number>], number, never, never, never>
sink2
)).
(method) Promise<[Option<number>, Chunk<number>]>.then<void, never>(onfulfilled?: ((value: [Option<number>, Chunk<number>]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
21
/*
22
Output:
23
[
24
{ _id: 'Option', _tag: 'Some', value: 1 },
25
{ _id: 'Chunk', values: [ 2, 3, 4, 5 ] }
26
]
27
*/

If leftover elements are not needed, you can ignore them using Sink.ignoreLeftover. This approach discards any unconsumed elements, so the sink operation focuses only on the elements it needs.

Example (Ignoring Leftover Elements)

1
import {
import Stream
Stream
,
import Sink
Sink
,
import Effect
Effect
} from "effect"
2
3
const
const stream: Stream.Stream<number, never, never>
stream
=
import Stream
Stream
.
const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3, 4, 5)
4
5
// Take the first 3 elements and ignore any remaining elements
6
const
const sink: Sink.Sink<[Chunk<number>, Chunk<never>], number, never, never, never>
sink
=
import Sink
Sink
.
const take: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>

A sink that takes the specified number of values.

take
<number>(3).
(method) Pipeable.pipe<Sink.Sink<Chunk<number>, number, number, never, never>, Sink.Sink<Chunk<number>, number, never, never, never>, Sink.Sink<[Chunk<number>, Chunk<...>], number, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<...>) => Sink.Sink<...>, bc: (_: Sink.Sink<...>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
7
import Sink
Sink
.
const ignoreLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, never, E, R>

Drains the remaining elements from the stream after the sink finishes

ignoreLeftover
,
8
import Sink
Sink
.
const collectLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, Chunk<L>], In, never, E, R>

Collects the leftovers from the stream when the sink succeeds and returns them as part of the sink's result.

collectLeftover
9
)
10
11
import Effect
Effect
.
const runPromise: <[Chunk<number>, Chunk<never>], never>(effect: Effect.Effect<[Chunk<number>, Chunk<never>], never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<...>

Executes an effect and returns a `Promise` that resolves with the result. Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises. If the effect fails, the returned Promise will be rejected with the error.

runPromise
(
import Stream
Stream
.
const run: <number, never, never, [Chunk<number>, Chunk<never>], never, never>(self: Stream.Stream<number, never, never>, sink: Sink.Sink<[Chunk<number>, Chunk<never>], number, unknown, never, never>) => Effect.Effect<...> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
const stream: Stream.Stream<number, never, never>
stream
,
const sink: Sink.Sink<[Chunk<number>, Chunk<never>], number, never, never, never>
sink
)).
(method) Promise<[Chunk<number>, Chunk<never>]>.then<void, never>(onfulfilled?: ((value: [Chunk<number>, Chunk<never>]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
12
/*
13
Output:
14
[ { _id: 'Chunk', values: [ 1, 2, 3 ] }, { _id: 'Chunk', values: [] } ]
15
*/