Operations
In this guide, we’ll explore some essential operations you can perform on streams. These operations allow you to manipulate and interact with stream elements in various ways.
The Stream.tap
operation allows you to run an effect on each element emitted by the stream, observing or performing side effects without altering the elements or return type. This can be useful for logging, monitoring, or triggering additional actions with each emission.
Example (Logging with Stream.tap
)
For example, Stream.tap
can be used to log each element before and after a mapping operation:
The “taking” operations in streams let you extract a specific set of elements, either by a fixed number, condition, or position within the stream. Here are a few ways to apply these operations:
API | Description |
---|---|
take | Extracts a fixed number of elements. |
takeWhile | Extracts elements while a certain condition is met. |
takeUntil | Extracts elements until a certain condition is met. |
takeRight | Extracts a specified number of elements from the end. |
Example (Extracting Elements in Different Ways)
When working with asynchronous data sources, such as async iterables, you often need to consume data in a loop until a certain condition is met. Streams provide a similar approach and offer additional flexibility.
With async iterables, data is processed in a loop until a break or return statement is encountered. To replicate this behavior with Streams, consider these options:
API | Description |
---|---|
takeUntil | Takes elements from a stream until a specified condition is met, similar to breaking out of a loop. |
toPull | Returns an effect that continuously pulls data chunks from the stream. This effect can fail with None when the stream is finished or with Some error if it fails. |
Example (Using Stream.toPull
)
The Stream.map
operation applies a specified function to each element in a stream, creating a new stream with the transformed values.
Example (Incrementing Each Element by 1)
The Stream.as
method allows you to replace each success value in a stream with a specified constant value. This can be useful when you want all elements in the stream to emit a uniform value, regardless of the original data.
Example (Mapping to null
)
For transformations involving effects, use Stream.mapEffect
. This function applies an effectful operation to each element in the stream, producing a new stream with effectful results.
Example (Random Number Generation)
To handle multiple effectful transformations concurrently, you can use the concurrency option. This option allows a specified number of effects to run concurrently, with results emitted downstream in their original order.
Example (Fetching URLs Concurrently)
Stream.mapAccum
is similar to Stream.map
, but it applies a transformation with state tracking, allowing you to map and accumulate values within a single operation. This is useful for tasks like calculating a running total in a stream.
Example (Calculating a Running Total)
The Stream.mapConcat
operation is similar to Stream.map
, but it goes further by mapping each element to zero or more elements (as an Iterable
) and then flattening the entire stream. This is particularly useful for transforming each element into multiple values.
Example (Splitting and Flattening a Stream)
The Stream.filter
operation allows you to pass through only elements that meet a specific condition. It’s a way to retain elements in a stream that satisfy a particular criteria while discarding the rest.
Example (Filtering Even Numbers)
Stream scanning allows you to apply a function cumulatively to each element in the stream, emitting every intermediate result. Unlike reduce
, which only provides a final result, scan
offers a step-by-step view of the accumulation process.
Example (Cumulative Addition)
If you need only the final accumulated value, you can use Stream.runFold:
Example (Final Accumulated Result)
Stream draining lets you execute effectful operations within a stream while discarding the resulting values. This can be useful when you need to run actions or perform side effects but don’t require the emitted values. The Stream.drain
function achieves this by ignoring all elements in the stream and producing an empty output stream.
Example (Executing Effectful Operations without Collecting Values)
The Stream.changes
operation detects and emits elements that differ from their preceding elements within a stream. This can be useful for tracking changes or deduplicating consecutive values.
Example (Emitting Distinct Consecutive Elements)
Zipping combines elements from two streams into a new stream, pairing elements from each input stream. This can be achieved with Stream.zip
or Stream.zipWith
, allowing for custom pairing logic.
Example (Basic Zipping)
In this example, elements from the two streams are paired sequentially. The resulting stream ends when one of the streams is exhausted.
Example (Custom Zipping Logic)
Here, Stream.zipWith
applies custom logic to each pair, combining elements in a user-defined way.
If one input stream ends before the other, you might want to zip with default values to avoid missing pairs. The Stream.zipAll
and Stream.zipAllWith
operators provide this functionality, allowing you to specify defaults for either stream.
Example (Zipping with Default Values)
In this example, when the second stream completes, the first stream continues with “x” as a default value for the second stream.
Example (Custom Logic with zipAllWith)
With Stream.zipAllWith
, custom logic determines how to combine elements when either stream runs out, offering flexibility to handle these cases.
When combining streams that emit elements at different speeds, you may not want to wait for the slower stream to emit. Using Stream.zipLatest
or Stream.zipLatestWith
, you can zip elements as soon as either stream produces a new value. These functions use the most recent element from the slower stream whenever a new value arrives from the faster stream.
Example (Combining Streams with Different Emission Rates)
API | Description |
---|---|
zipWithPrevious | Pairs each element of a stream with its previous element. |
zipWithNext | Pairs each element of a stream with its next element. |
zipWithPreviousAndNext | Pairs each element with both its previous and next. |
Example (Pairing Stream Elements with Next)
The Stream.zipWithIndex
operator is a helpful tool for indexing each element in a stream, pairing each item with its respective position in the sequence. This is particularly useful when you want to keep track of the order of elements within a stream.
Example (Indexing Each Element in a Stream)
The Stream module includes a feature for computing the Cartesian Product of two streams, allowing you to create combinations of elements from two different streams. This is helpful when you need to pair each element from one set with every element of another.
In simple terms, imagine you have two collections and want to form all possible pairs by picking one item from each. This pairing process is the Cartesian Product. In streams, this operation generates a new stream that includes every possible pairing of elements from the two input streams.
To create a Cartesian Product of two streams, the Stream.cross
operator is available, along with similar variants. These operators combine two streams into a new stream of all possible element combinations.
Example (Creating a Cartesian Product of Two Streams)
Partitioning a stream involves dividing it into two distinct streams based on a specified condition. The Stream module offers two functions for this purpose: Stream.partition
and Stream.partitionEither
. Let’s look at how these functions work and the best scenarios for their use.
The Stream.partition
function takes a predicate (a condition) as input and divides the original stream into two substreams. One substream will contain elements that meet the condition, while the other contains those that do not. Both resulting substreams are wrapped in a Scope
type.
Example (Partitioning a Stream into Even and Odd Numbers)
In some cases, you might need to partition a stream using a condition that involves an effect. For this, the Stream.partitionEither
function is ideal. This function uses an effectful predicate to split the stream into two substreams: one for elements that produce Either.left
values and another for elements that produce Either.right
values.
Example (Partitioning a Stream with an Effectful Predicate)
When processing streams of data, you may need to group elements based on specific criteria. The Stream module provides two functions for this purpose: groupByKey
, groupBy
, grouped
and groupedWithin
. Let’s review how these functions work and when to use each one.
The Stream.groupByKey
function partitions a stream based on a key function of type (a: A) => K
, where A
is the type of elements in the stream, and K
represents the keys for grouping. This function is non-effectful and groups elements by simply applying the provided key function.
The result of Stream.groupByKey
is a GroupBy
data type, representing the grouped stream. To process each group, you can use GroupBy.evaluate
, which takes a function of type (key: K, stream: Stream<V, E>) => Stream.Stream<...>
. This function operates across all groups and merges them together in a non-deterministic order.
Example (Grouping by Tens Place in Exam Scores)
In the following example, we use Stream.groupByKey
to group exam scores by the tens place and count the number of scores in each group:
For more complex grouping requirements where partitioning involves effects, you can use the Stream.groupBy
function. This function accepts an effectful partitioning function and returns a GroupBy
data type, representing the grouped stream. You can then process each group by using GroupBy.evaluate
, similar to Stream.groupByKey
.
Example (Grouping Names by First Letter)
In the following example, we group names by their first letter and count the number of names in each group. Here, the partitioning operation is set up as an effectful operation:
The Stream.grouped
function is ideal for dividing a stream into chunks of a specified size, making it easier to handle data in smaller, organized segments. This is particularly helpful when processing or displaying data in batches.
Example (Dividing a Stream into Chunks of 3 Elements)
The Stream.groupedWithin
function allows for flexible grouping by creating chunks based on either a specified maximum size or a time interval, whichever condition is met first. This is especially useful for working with data where timing constraints are involved.
Example (Grouping by Size or Time Interval)
In this example, Stream.groupedWithin(18, "1.5 seconds")
groups the stream into chunks whenever either 18 elements accumulate or 1.5 seconds elapse since the last chunk was created.
In stream processing, you may need to combine the contents of multiple streams. The Stream module offers several operators to achieve this, including Stream.concat
, Stream.concatAll
, and Stream.flatMap
. Let’s look at how each of these operators works.
The Stream.concat
operator is a straightforward method for joining two streams. It returns a new stream that emits elements from the first stream (left-hand) followed by elements from the second stream (right-hand). This is helpful when you want to combine two streams in a specific sequence.
Example (Concatenating Two Streams Sequentially)
If you have multiple streams to concatenate, Stream.concatAll
provides an efficient way to combine them without manually chaining multiple Stream.concat
operations. This function takes a Chunk of streams and returns a single stream containing the elements of each stream in sequence.
Example (Concatenating Multiple Streams)
The Stream.flatMap
operator allows for advanced concatenation by creating a stream where each element is generated by applying a function of type (a: A) => Stream<...>
to each output of the source stream. This operator then concatenates all the resulting streams, effectively flattening them.
Example (Generating Repeated Elements with Stream.flatMap
)
If you need to perform the flatMap
operation concurrently, you can use the concurrency option to control how many inner streams run simultaneously.
Additionally, if the order of concatenation is not important, you can use the switch
option.
Sometimes, you may want to interleave elements from two streams and create a single output stream. In such cases, Stream.concat
isn’t suitable because it waits for the first stream to complete before consuming the second. For interleaving elements as they become available, Stream.merge
and its variants are designed for this purpose.
The Stream.merge
operation combines elements from two source streams into a single stream, interleaving elements as they are produced. Unlike Stream.concat
, Stream.merge
does not wait for one stream to finish before starting the other.
Example (Interleaving Two Streams with Stream.merge
)
When merging two streams, it’s important to consider the termination strategy, especially if each stream has a different lifetime.
By default, Stream.merge
waits for both streams to terminate before ending the merged stream. However, you can modify this behavior with haltStrategy
, selecting from four termination strategies:
Termination Strategy | Description |
---|---|
"left" | The merged stream terminates when the left-hand stream terminates. |
"right" | The merged stream terminates when the right-hand stream terminates. |
"both" (default) | The merged stream terminates only when both streams have terminated. |
"either" | The merged stream terminates as soon as either stream terminates. |
Example (Using haltStrategy: "left"
to Control Stream Termination)
In some cases, you may want to merge two streams while transforming their elements into a unified type. Stream.mergeWith
is designed for this purpose, allowing you to specify transformation functions for each source stream.
Example (Merging and Transforming Two Streams)
The Stream.interleave
operator lets you pull one element at a time from each of two streams, creating a new interleaved stream. If one stream finishes first, the remaining elements from the other stream continue to be pulled until both streams are exhausted.
Example (Basic Interleaving of Two Streams)
For more complex interleaving, Stream.interleaveWith
provides additional control by using a third stream of boolean
values to dictate the interleaving pattern. When this stream emits true
, an element is taken from the left-hand stream; otherwise, an element is taken from the right-hand stream.
Example (Custom Interleaving Logic Using Stream.interleaveWith
)
Interspersing adds separators or affixes in a stream, useful for formatting or structuring data in streams.
The Stream.intersperse
operator inserts a specified delimiter element between each pair of elements in a stream. This delimiter can be any chosen value and is added between each consecutive pair.
Example (Inserting Delimiters Between Stream Elements)
For more complex needs, Stream.intersperseAffixes
provides control over different affixes at the start, between elements, and at the end of the stream.
Example (Adding Affixes to a Stream)
Broadcasting a stream creates multiple downstream streams that each receive the same elements from the source stream. This is useful when you want to send each element to multiple consumers simultaneously. The upstream stream has a maximumLag
parameter that sets the limit for how much it can get ahead before slowing down to match the speed of the slowest downstream stream.
Example (Broadcasting to Multiple Downstream Streams)
In the following example, we broadcast a stream of numbers to two downstream consumers. The first calculates the maximum value in the stream, while the second logs each number with a delay. The upstream stream’s speed adjusts based on the slower logging stream:
Effect streams use a pull-based model, allowing downstream consumers to control the rate at which they request elements. However, when there’s a mismatch in the speed between the producer and the consumer, buffering can help balance their interaction. The Stream.buffer
operator is designed to manage this, allowing the producer to keep working even if the consumer is slower. You can set a maximum buffer capacity using the capacity
option.
The Stream.buffer
operator queues elements to allow the producer to work independently from the consumer, up to a specified capacity. This helps when a faster producer and a slower consumer need to operate smoothly without blocking each other.
Example (Using a Buffer to Handle Speed Mismatch)
Different buffering options let you tailor the buffering strategy based on your use case:
Buffering Type | Configuration | Description |
---|---|---|
Bounded Queue | { capacity: number } | Limits the queue to a fixed size. |
Unbounded Queue | { capacity: "unbounded" } | Allows an unlimited number of buffered items. |
Sliding Queue | { capacity: number, strategy: "sliding" } | Keeps the most recent items, discarding older ones when full. |
Dropping Queue | { capacity: number, strategy: "dropping" } | Keeps the earliest items, discarding new ones when full. |
Debouncing is a technique used to prevent a function from firing too frequently, which is particularly useful when a stream emits values rapidly but only the last value after a pause is needed.
The Stream.debounce
function achieves this by delaying the emission of values until a specified time period has passed without any new values. If a new value arrives during the waiting period, the timer resets, and only the latest value will eventually be emitted after a pause.
Example (Debouncing a Stream of Rapidly Emitted Values)
Throttling is a technique for regulating the rate at which elements are emitted from a stream. It helps maintain a steady data output pace, which is valuable in situations where data processing needs to occur at a consistent rate.
The Stream.throttle
function uses the token bucket algorithm to control the rate of stream emissions.
Example (Throttle Configuration)
In this configuration:
- Each chunk processed uses one token (
cost = () => 1
). - Tokens are replenished at a rate of one token (
units: 1
) every 100 milliseconds (duration: "100 millis"
).
The “shape” strategy moderates data flow by delaying chunk emissions until they comply with specified bandwidth constraints. This strategy ensures that data throughput does not exceed defined limits, allowing for steady and controlled data emission.
Example (Applying Throttling with the Shape Strategy)
The “enforce” strategy strictly regulates data flow by discarding chunks that exceed bandwidth constraints.
Example (Throttling with the Enforce Strategy)
The Stream.throttle
function offers a burst option that allows for temporary increases in data throughput beyond the set rate limits.
This option is set to greater than 0 to activate burst capability (default is 0, indicating no burst support).
The burst capacity provides additional tokens in the token bucket, enabling the stream to momentarily exceed its configured rate when bursts of data occur.
Example (Throttling with Burst Capacity)
In this setup, the stream starts with a bucket containing 5 tokens, allowing the first five chunks to be emitted instantly. The additional burst capacity of 2 accommodates further emissions momentarily, allowing for handling of subsequent data more flexibly. Over time, as the bucket refills according to the throttle configuration, additional elements are emitted, demonstrating how the burst capability can manage uneven data flows effectively.
When working with streams, you may need to introduce specific time intervals between each element’s emission. The Stream.schedule
combinator allows you to set these intervals.
Example (Adding a Delay Between Stream Emissions)
In this example, we’ve used the Schedule.spaced("1 second")
schedule to introduce a one-second gap between each emission in the stream.