Creating Sinks
On this page
In the world of streams, sinks are used to consume and process the elements of a stream. Here, we will introduce some common sink constructors that allow you to create sinks for specific tasks.
Common Constructors
head
The head
sink creates a sink that captures the first element of a stream. If the stream is empty, it returns None
.
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .head ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Option",_tag: "Some",value: 1}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .head ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Option",_tag: "Some",value: 1}*/
last
The last
sink consumes all elements of a stream and returns the last element of the stream.
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .last ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Option",_tag: "Some",value: 4}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .last ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Option",_tag: "Some",value: 4}*/
count
The count
sink consumes all elements of the stream and counts the number of elements fed to it.
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .count ))Effect .runPromise (effect ).then (console .log )/*Output:4*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .count ))Effect .runPromise (effect ).then (console .log )/*Output:4*/
sum
The sum
sink consumes all elements of the stream and sums incoming numeric values.
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .sum ))Effect .runPromise (effect ).then (console .log )/*Output:10*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .sum ))Effect .runPromise (effect ).then (console .log )/*Output:10*/
take
The take
sink takes the specified number of values from the stream and results in a Chunk data type.
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .take (3)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3 ]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .take (3)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3 ]}*/
drain
The drain
sink ignores its inputs, effectively discarding them.
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .drain ))Effect .runPromise (effect ).then (console .log )/*Output:undefined*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .drain ))Effect .runPromise (effect ).then (console .log )/*Output:undefined*/
timed
The timed
sink executes the stream and measures its execution time, providing the duration.
ts
import {Stream ,Schedule ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .schedule (Schedule .spaced ("100 millis")),Stream .run (Sink .timed ))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Duration",_tag: "Millis",millis: 523}*/
ts
import {Stream ,Schedule ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .schedule (Schedule .spaced ("100 millis")),Stream .run (Sink .timed ))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Duration",_tag: "Millis",millis: 523}*/
forEach
The forEach
sink executes the provided effectful function for every element fed to it.
ts
import {Stream ,Sink ,Console ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .forEach (Console .log )))Effect .runPromise (effect ).then (console .log )/*Output:1234undefined*/
ts
import {Stream ,Sink ,Console ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .forEach (Console .log )))Effect .runPromise (effect ).then (console .log )/*Output:1234undefined*/
From Success and Failure
In the realm of data streams, similar to crafting streams to hold and manipulate data, we can also create sinks using the Sink.fail
and Sink.succeed
functions.
Succeeding Sink
Let's start with a sink that doesn't consume any elements from its upstream but instead succeeds with a numeric value:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .succeed (0)))Effect .runPromise (effect ).then (console .log )/*Output:0*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .succeed (0)))Effect .runPromise (effect ).then (console .log )/*Output:0*/
Failing Sink
Now, consider a sink that also doesn't consume any elements from its upstream but deliberately fails, generating an error message of type string
:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .fail ("fail!")))Effect .runPromiseExit (effect ).then (console .log )/*Output:{_id: 'Exit',_tag: 'Failure',cause: { _id: 'Cause', _tag: 'Fail', failure: 'fail!' }}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .fail ("fail!")))Effect .runPromiseExit (effect ).then (console .log )/*Output:{_id: 'Exit',_tag: 'Failure',cause: { _id: 'Cause', _tag: 'Fail', failure: 'fail!' }}*/
Collecting
Collecting All Elements
To gather all the elements from a data stream into a Chunk, you can employ the Sink.collectAll()
function:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .collectAll ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4 ]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .collectAll ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4 ]}*/
Collecting into a HashSet
If you want to accumulate the elements into a HashSet
, you can use Sink.collectAllToSet()
. This function ensures that each element appears only once in the resulting set:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 2, 3, 4, 4).pipe (Stream .run (Sink .collectAllToSet ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashSet",values: [ 1, 2, 3, 4 ]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 2, 3, 4, 4).pipe (Stream .run (Sink .collectAllToSet ()))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashSet",values: [ 1, 2, 3, 4 ]}*/
Collecting into a HashMap
For more advanced collection needs, you can use Sink.collectAllToMap()
. This function allows you to accumulate and merge elements into a HashMap<K, A>
using a specified merge function. In the following example, we determine map keys with (n) => n % 3
and merge elements with the same key using (a, b) => a + b
:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 3, 2, 3, 1, 5, 1).pipe (Stream .run (Sink .collectAllToMap ((n ) =>n % 3,(a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashMap",values: [[ 0, 6 ], [ 1, 3 ], [ 2, 7 ]]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 3, 2, 3, 1, 5, 1).pipe (Stream .run (Sink .collectAllToMap ((n ) =>n % 3,(a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashMap",values: [[ 0, 6 ], [ 1, 3 ], [ 2, 7 ]]}*/
Collecting a Specified Number
If you only want to collect a specific number of elements from a stream into a Chunk, you can use Sink.collectAllN(n)
:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .collectAllN (3)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3 ]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .collectAllN (3)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3 ]}*/
Collecting While Meeting a Condition
To accumulate elements as long as they satisfy a specific condition, you can use Sink.collectAllWhile(predicate)
. This function will keep gathering elements until the predicate returns false
:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 0, 4, 0, 6, 7).pipe (Stream .run (Sink .collectAllWhile ((n ) =>n !== 0)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2 ]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 0, 4, 0, 6, 7).pipe (Stream .run (Sink .collectAllWhile ((n ) =>n !== 0)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2 ]}*/
Collecting into HashSets of a Specific Size
For more controlled collection into sets with a maximum size of n
, you can utilize Sink.collectAllToSetN(n)
:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 2, 3, 4, 4).pipe (Stream .run (Sink .collectAllToSetN (3)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashSet",values: [ 1, 2, 3 ]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 2, 3, 4, 4).pipe (Stream .run (Sink .collectAllToSetN (3)))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashSet",values: [ 1, 2, 3 ]}*/
Collecting into HashMaps with Limited Keys
If you need to accumulate elements into maps with a maximum of n
keys, you can employ Sink.collectAllToMapN(n, keyFunction, mergeFunction)
:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 3, 2, 3, 1, 5, 1).pipe (Stream .run (Sink .collectAllToMapN (3,(n ) =>n ,(a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashMap",values: [[ 1, 2 ], [ 2, 2 ], [ 3, 6 ]]}*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 3, 2, 3, 1, 5, 1).pipe (Stream .run (Sink .collectAllToMapN (3,(n ) =>n ,(a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:{_id: "HashMap",values: [[ 1, 2 ], [ 2, 2 ], [ 3, 6 ]]}*/
Folding
Folding Left
Imagine you have a stream of numbers, and you want to reduce them into a single value by applying an operation to each element sequentially. You can achieve this using the Sink.foldLeft
function:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .foldLeft (0, (a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:10*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4).pipe (Stream .run (Sink .foldLeft (0, (a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:10*/
Folding with Termination
In some cases, you may want to fold elements in a stream but stop the folding process when a certain condition is met. This is called "short-circuiting." You can achieve this using the Sink.fold
function, which allows you to specify a termination predicate:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .iterate (0, (n ) =>n + 1).pipe (Stream .run (Sink .fold (0,(sum ) =>sum <= 10,(a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:15*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .iterate (0, (n ) =>n + 1).pipe (Stream .run (Sink .fold (0,(sum ) =>sum <= 10,(a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:15*/
Folding with Weighted Elements
Sometimes, you may want to fold elements based on their weight or cost, accumulating them until a certain maximum cost is reached. You can do this using Sink.foldWeighted
. In the following example, we group elements based on a weight of 1, restarting the folding process when the total weight reaches 3:
ts
import {Stream ,Sink ,Chunk ,Effect } from "effect"conststream =Stream .make (3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe (Stream .transduce (Sink .foldWeighted ({initial :Chunk .empty <number>(),maxCost : 3,cost : () => 1,body : (acc ,el ) =>Chunk .append (acc ,el )})))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 3, 2, 4 ]}, {_id: "Chunk",values: [ 1, 5, 6 ]}, {_id: "Chunk",values: [ 2, 1, 3 ]}, {_id: "Chunk",values: [ 5, 6 ]}]}*/
ts
import {Stream ,Sink ,Chunk ,Effect } from "effect"conststream =Stream .make (3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe (Stream .transduce (Sink .foldWeighted ({initial :Chunk .empty <number>(),maxCost : 3,cost : () => 1,body : (acc ,el ) =>Chunk .append (acc ,el )})))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 3, 2, 4 ]}, {_id: "Chunk",values: [ 1, 5, 6 ]}, {_id: "Chunk",values: [ 2, 1, 3 ]}, {_id: "Chunk",values: [ 5, 6 ]}]}*/
Folding Until a Limit
If you want to fold elements up to a specific limit, you can use Sink.foldUntil
. In the following example, we fold elements until we have accumulated 3 of them:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe (Stream .run (Sink .foldUntil (0, 3, (a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:6*/
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe (Stream .run (Sink .foldUntil (0, 3, (a ,b ) =>a +b )))Effect .runPromise (effect ).then (console .log )/*Output:6*/