Creating Sinks

On this page

In the world of streams, sinks are used to consume and process the elements of a stream. Here, we will introduce some common sink constructors that allow you to create sinks for specific tasks.

Common Constructors

head

The head sink creates a sink that captures the first element of a stream. If the stream is empty, it returns None.

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.head()))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 1
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.head()))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 1
}
*/

last

The last sink consumes all elements of a stream and returns the last element of the stream.

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.last()))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 4
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.last()))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 4
}
*/

count

The count sink consumes all elements of the stream and counts the number of elements fed to it.

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.count))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
4
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.count))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
4
*/

sum

The sum sink consumes all elements of the stream and sums incoming numeric values.

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.sum))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
10
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.sum))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
10
*/

take

The take sink takes the specified number of values from the stream and results in a Chunk data type.

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.take(3)))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.take(3)))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/

drain

The drain sink ignores its inputs, effectively discarding them.

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.drain))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
undefined
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.drain))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
undefined
*/

timed

The timed sink executes the stream and measures its execution time, providing the duration.

ts
import { Stream, Schedule, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.schedule(Schedule.spaced("100 millis")),
Stream.run(Sink.timed)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Duration",
_tag: "Millis",
millis: 523
}
*/
ts
import { Stream, Schedule, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.schedule(Schedule.spaced("100 millis")),
Stream.run(Sink.timed)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Duration",
_tag: "Millis",
millis: 523
}
*/

forEach

The forEach sink executes the provided effectful function for every element fed to it.

ts
import { Stream, Sink, Console, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.forEach(Console.log))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
1
2
3
4
undefined
*/
ts
import { Stream, Sink, Console, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.forEach(Console.log))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
1
2
3
4
undefined
*/

From Success and Failure

In the realm of data streams, similar to crafting streams to hold and manipulate data, we can also create sinks using the Sink.fail and Sink.succeed functions.

Succeeding Sink

Let's start with a sink that doesn't consume any elements from its upstream but instead succeeds with a numeric value:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.succeed(0)))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
0
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.succeed(0)))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
0
*/

Failing Sink

Now, consider a sink that also doesn't consume any elements from its upstream but deliberately fails, generating an error message of type string:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.fail("fail!")))
 
Effect.runPromiseExit(effect).then(console.log)
/*
Output:
{
_id: 'Exit',
_tag: 'Failure',
cause: { _id: 'Cause', _tag: 'Fail', failure: 'fail!' }
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.fail("fail!")))
 
Effect.runPromiseExit(effect).then(console.log)
/*
Output:
{
_id: 'Exit',
_tag: 'Failure',
cause: { _id: 'Cause', _tag: 'Fail', failure: 'fail!' }
}
*/

Collecting

Collecting All Elements

To gather all the elements from a data stream into a Chunk, you can employ the Sink.collectAll() function:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.collectAll()))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4 ]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(Stream.run(Sink.collectAll()))
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4 ]
}
*/

Collecting into a HashSet

If you want to accumulate the elements into a HashSet, you can use Sink.collectAllToSet(). This function ensures that each element appears only once in the resulting set:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSet())
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashSet",
values: [ 1, 2, 3, 4 ]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSet())
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashSet",
values: [ 1, 2, 3, 4 ]
}
*/

Collecting into a HashMap

For more advanced collection needs, you can use Sink.collectAllToMap(). This function allows you to accumulate and merge elements into a HashMap<K, A> using a specified merge function. In the following example, we determine map keys with (n) => n % 3 and merge elements with the same key using (a, b) => a + b:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMap(
(n) => n % 3,
(a, b) => a + b
)
)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashMap",
values: [
[ 0, 6 ], [ 1, 3 ], [ 2, 7 ]
]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMap(
(n) => n % 3,
(a, b) => a + b
)
)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashMap",
values: [
[ 0, 6 ], [ 1, 3 ], [ 2, 7 ]
]
}
*/

Collecting a Specified Number

If you only want to collect a specific number of elements from a stream into a Chunk, you can use Sink.collectAllN(n):

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.run(Sink.collectAllN(3))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.run(Sink.collectAllN(3))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3 ]
}
*/

Collecting While Meeting a Condition

To accumulate elements as long as they satisfy a specific condition, you can use Sink.collectAllWhile(predicate). This function will keep gathering elements until the predicate returns false:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 0, 4, 0, 6, 7).pipe(
Stream.run(Sink.collectAllWhile((n) => n !== 0))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2 ]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 0, 4, 0, 6, 7).pipe(
Stream.run(Sink.collectAllWhile((n) => n !== 0))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2 ]
}
*/

Collecting into HashSets of a Specific Size

For more controlled collection into sets with a maximum size of n, you can utilize Sink.collectAllToSetN(n):

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSetN(3))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashSet",
values: [ 1, 2, 3 ]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 2, 3, 4, 4).pipe(
Stream.run(Sink.collectAllToSetN(3))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashSet",
values: [ 1, 2, 3 ]
}
*/

Collecting into HashMaps with Limited Keys

If you need to accumulate elements into maps with a maximum of n keys, you can employ Sink.collectAllToMapN(n, keyFunction, mergeFunction):

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMapN(
3,
(n) => n,
(a, b) => a + b
)
)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashMap",
values: [
[ 1, 2 ], [ 2, 2 ], [ 3, 6 ]
]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 3, 2, 3, 1, 5, 1).pipe(
Stream.run(
Sink.collectAllToMapN(
3,
(n) => n,
(a, b) => a + b
)
)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
{
_id: "HashMap",
values: [
[ 1, 2 ], [ 2, 2 ], [ 3, 6 ]
]
}
*/

Folding

Folding Left

Imagine you have a stream of numbers, and you want to reduce them into a single value by applying an operation to each element sequentially. You can achieve this using the Sink.foldLeft function:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.foldLeft(0, (a, b) => a + b))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
10
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4).pipe(
Stream.run(Sink.foldLeft(0, (a, b) => a + b))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
10
*/

Folding with Termination

In some cases, you may want to fold elements in a stream but stop the folding process when a certain condition is met. This is called "short-circuiting." You can achieve this using the Sink.fold function, which allows you to specify a termination predicate:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.iterate(0, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum <= 10,
(a, b) => a + b
)
)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
15
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.iterate(0, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum <= 10,
(a, b) => a + b
)
)
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
15
*/

Folding with Weighted Elements

Sometimes, you may want to fold elements based on their weight or cost, accumulating them until a certain maximum cost is reached. You can do this using Sink.foldWeighted. In the following example, we group elements based on a weight of 1, restarting the folding process when the total weight reaches 3:

ts
import { Stream, Sink, Chunk, Effect } from "effect"
 
const stream = Stream.make(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe(
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<number>(),
maxCost: 3,
cost: () => 1,
body: (acc, el) => Chunk.append(acc, el)
})
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 3, 2, 4 ]
}, {
_id: "Chunk",
values: [ 1, 5, 6 ]
}, {
_id: "Chunk",
values: [ 2, 1, 3 ]
}, {
_id: "Chunk",
values: [ 5, 6 ]
}
]
}
*/
ts
import { Stream, Sink, Chunk, Effect } from "effect"
 
const stream = Stream.make(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe(
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<number>(),
maxCost: 3,
cost: () => 1,
body: (acc, el) => Chunk.append(acc, el)
})
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 3, 2, 4 ]
}, {
_id: "Chunk",
values: [ 1, 5, 6 ]
}, {
_id: "Chunk",
values: [ 2, 1, 3 ]
}, {
_id: "Chunk",
values: [ 5, 6 ]
}
]
}
*/

Folding Until a Limit

If you want to fold elements up to a specific limit, you can use Sink.foldUntil. In the following example, we fold elements until we have accumulated 3 of them:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
Stream.run(Sink.foldUntil(0, 3, (a, b) => a + b))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
6
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
Stream.run(Sink.foldUntil(0, 3, (a, b) => a + b))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
6
*/