Sink Operations
On this page
In the previous sections, we learned how to create and use sinks. Now, let's explore some operations you can perform on sinks to transform or filter their behavior.
Changing the Input
Sometimes, you have a sink that works perfectly with one type of input, but you want to use it with a different type. This is where Sink.mapInput
comes in handy. While Sink.map
modifies the output of a function, Sink.mapInput
modifies the input. It allows you to adapt your sink to work with a different input.
Imagine you have a Sink.sum
that calculates the sum of incoming numeric values. However, your stream contains strings, not numbers. You can use mapInput
to convert your strings into numbers and make Sink.sum
compatible with your stream:
ts
import {Stream ,Sink ,Effect } from "effect"constnumericSum =Sink .sum conststringSum =numericSum .pipe (Sink .mapInput ((s : string) =>Number .parseFloat (s )))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .run (stringSum ))).then (console .log )/*Output:15*/
ts
import {Stream ,Sink ,Effect } from "effect"constnumericSum =Sink .sum conststringSum =numericSum .pipe (Sink .mapInput ((s : string) =>Number .parseFloat (s )))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .run (stringSum ))).then (console .log )/*Output:15*/
Transforming Both Input and Output
If you need to change both the input and output of a sink, you can use Sink.dimap
. It's an extended version of mapInput
that lets you transform both types. This can be useful when you need to perform a complete conversion between your input and output types:
ts
import {Stream ,Sink ,Effect } from "effect"// Convert its input to integers, do the computation and then convert them back to a stringconstsumSink =Sink .sum .pipe (Sink .dimap ({onInput : (s : string) =>Number .parseFloat (s ),onDone : (n ) =>String (n )}))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .run (sumSink ))).then (console .log )/*Output:15 <-- as string*/
ts
import {Stream ,Sink ,Effect } from "effect"// Convert its input to integers, do the computation and then convert them back to a stringconstsumSink =Sink .sum .pipe (Sink .dimap ({onInput : (s : string) =>Number .parseFloat (s ),onDone : (n ) =>String (n )}))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .run (sumSink ))).then (console .log )/*Output:15 <-- as string*/
Filtering Input
Sinks offer a way to filter incoming elements using Sink.filterInput
. This allows you to collect or process only the elements that meet a specific condition. In the following example, we collect elements in chunks of three and filter out the negative numbers:
ts
import {Stream ,Sink ,Effect } from "effect"conststream =Stream .make (1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6).pipe (Stream .transduce (Sink .collectAllN <number>(3).pipe (Sink .filterInput ((n ) =>n > 0))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 1, 1, 3 ]}, {_id: "Chunk",values: [ 4, 2, 1 ]}, {_id: "Chunk",values: [ 1, 1, 6 ]}, {_id: "Chunk",values: []}]}*/
ts
import {Stream ,Sink ,Effect } from "effect"conststream =Stream .make (1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6).pipe (Stream .transduce (Sink .collectAllN <number>(3).pipe (Sink .filterInput ((n ) =>n > 0))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 1, 1, 3 ]}, {_id: "Chunk",values: [ 4, 2, 1 ]}, {_id: "Chunk",values: [ 1, 1, 6 ]}, {_id: "Chunk",values: []}]}*/