In stream processing, sinks are used to consume and handle elements from a stream. Here, we’ll explore various sink constructors that allow you to create sinks for specific tasks.
Common Constructors
head
The Sink.head sink retrieves only the first element from a stream, wrapping it in Some. If the stream has no elements, it returns None.
Example (Retrieving the First Element)
last
The Sink.last sink retrieves only the last element from a stream, wrapping it in Some. If the stream has no elements, it returns None.
Example (Retrieving the Last Element)
count
The Sink.count sink consumes all elements of the stream and counts the number of elements fed to it.
sum
The Sink.sum sink consumes all elements of the stream and sums incoming numeric values.
take
The Sink.take sink takes the specified number of values from the stream and results in a Chunk data type.
drain
The Sink.drain sink ignores its inputs, effectively discarding them.
timed
The Sink.timed sink executes the stream and measures its execution time, providing the Duration.
forEach
The Sink.forEach sink executes the provided effectful function for every element fed to it.
Creating Sinks from Success and Failure
Just as you can define streams to hold or manipulate data, you can also create sinks with specific success or failure outcomes using the Sink.fail and Sink.succeed functions.
Succeeding Sink
This example creates a sink that doesn’t consume any elements from its upstream source but instead immediately succeeds with a specified numeric value:
Example (Sink that Always Succeeds with a Value)
Failing Sink
In this example, the sink also doesn’t consume any elements from its upstream source. Instead, it fails with a specified error message of type string:
Example (Sink that Always Fails with an Error Message)
Collecting
Collecting All Elements
To gather all elements from a data stream into a Chunk, use the Sink.collectAll sink.
The final output is a chunk containing all elements from the stream, in the order they were emitted.
Example (Collecting All Stream Elements)
Collecting a Specified Number
To collect a fixed number of elements from a stream into a Chunk, use Sink.collectAllN. This sink stops collecting once it reaches the specified limit.
Example (Collecting a Limited Number of Elements)
Collecting While Meeting a Condition
To gather elements from a stream while they satisfy a specific condition, use Sink.collectAllWhile. This sink collects elements until the provided predicate returns false.
Example (Collecting Elements Until a Condition Fails)
Collecting into a HashSet
To accumulate stream elements into a HashSet, use Sink.collectAllToSet(). This ensures that each element appears only once in the final set.
Example (Collecting Unique Elements into a HashSet)
Collecting into HashSets of a Specific Size
For controlled collection into a HashSet with a specified maximum size, use Sink.collectAllToSetN. This sink gathers unique elements up to the given limit.
Example (Collecting Unique Elements with a Set Size Limit)
Collecting into a HashMap
For more complex collection scenarios, Sink.collectAllToMap lets you gather elements into a HashMap<K, A> with a specified keying and merging strategy.
This sink requires both a key function to define each element’s grouping and a merge function to combine values sharing the same key.
Example (Grouping and Merging Stream Elements in a HashMap)
In this example, we use (n) => n % 3 to determine map keys and (a, b) => a + b to merge elements with the same key:
Collecting into a HashMap with Limited Keys
To accumulate elements into a HashMap with a maximum number of keys, use Sink.collectAllToMapN. This sink collects elements until it reaches the specified key limit, requiring a key function to define the grouping of each element and a merge function to combine values with the same key.
Example (Limiting Collected Keys in a HashMap)
Folding
Folding Left
If you want to reduce a stream into a single cumulative value by applying an operation to each element in sequence, you can use the Sink.foldLeft function.
Example (Summing Elements in a Stream Using Fold Left)
Folding with Termination
Sometimes, you may want to fold elements in a stream but stop the process once a specific condition is met. This is known as “short-circuiting.” You can accomplish this with the Sink.fold function, which lets you define a termination condition.
Example (Folding with a Condition to Stop Early)
Folding Until a Limit
To accumulate elements until a specific count is reached, use Sink.foldUntil. This sink folds elements up to the specified limit and then stops.
Example (Accumulating a Set Number of Elements)
Folding with Weighted Elements
In some scenarios, you may want to fold elements based on a defined “weight” or “cost,” accumulating elements until a specified maximum cost is reached. You can accomplish this with Sink.foldWeighted.
Example (Accumulating Elements Based on Weight)
In the example below, each element has a weight of 1, and the folding resets when the accumulated weight hits 3.