Skip to content

Introduction to Streams

In this guide, we’ll explore the concept of a Stream<A, E, R>. A Stream is a program description that, when executed, can emit zero or more values of type A, handle errors of type E, and operates within a context of type R.

Streams are particularly handy whenever you’re dealing with sequences of values over time. They can serve as replacements for observables, node streams, and AsyncIterables.

Think of a Stream as an extension of an Effect. While an Effect<A, E, R> represents a program that requires a context of type R, may encounter an error of type E, and always produces a single result of type A, a Stream<A, E, R> takes this further by allowing the emission of zero or more values of type A.

To clarify, let’s examine some examples using Effect:

import {
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
,
import Chunk
Chunk
,
import Option

@since2.0.0

@since2.0.0

Option
} from "effect"
// An Effect that fails with a string error
const
const failedEffect: Effect.Effect<never, string, never>
failedEffect
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const fail: <string>(error: string) => Effect.Effect<never, string, never>

Creates an Effect that represents a recoverable error.

When to Use

Use this function to explicitly signal an error in an Effect. The error will keep propagating unless it is handled. You can handle the error with functions like

catchAll

or

catchTag

.

@seesucceed to create an effect that represents a successful value.

@example

// Title: Creating a Failed Effect
import { Effect } from "effect"
// ┌─── Effect<never, Error, never>
// ▼
const failure = Effect.fail(
new Error("Operation failed due to network error")
)

@since2.0.0

fail
("fail!")
// An Effect that produces a single number
const
const oneNumberValue: Effect.Effect<number, never, never>
oneNumberValue
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const succeed: <number>(value: number) => Effect.Effect<number, never, never>

Creates an Effect that always succeeds with a given value.

When to Use

Use this function when you need an effect that completes successfully with a specific value without any errors or external dependencies.

@seefail to create an effect that represents a failure.

@example

// Title: Creating a Successful Effect
import { Effect } from "effect"
// Creating an effect that represents a successful scenario
//
// ┌─── Effect<number, never, never>
// ▼
const success = Effect.succeed(42)

@since2.0.0

succeed
(3)
// An Effect that produces a chunk of numbers
const
const oneListValue: Effect.Effect<Chunk.NonEmptyChunk<number>, never, never>
oneListValue
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const succeed: <Chunk.NonEmptyChunk<number>>(value: Chunk.NonEmptyChunk<number>) => Effect.Effect<Chunk.NonEmptyChunk<number>, never, never>

Creates an Effect that always succeeds with a given value.

When to Use

Use this function when you need an effect that completes successfully with a specific value without any errors or external dependencies.

@seefail to create an effect that represents a failure.

@example

// Title: Creating a Successful Effect
import { Effect } from "effect"
// Creating an effect that represents a successful scenario
//
// ┌─── Effect<number, never, never>
// ▼
const success = Effect.succeed(42)

@since2.0.0

succeed
(
import Chunk
Chunk
.
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Chunk.NonEmptyChunk<number>

Builds a NonEmptyChunk from an non-empty collection of elements.

@since2.0.0

make
(1, 2, 3))
// An Effect that produces an optional number
const
const oneOption: Effect.Effect<Option.Option<number>, never, never>
oneOption
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const succeed: <Option.Option<number>>(value: Option.Option<number>) => Effect.Effect<Option.Option<number>, never, never>

Creates an Effect that always succeeds with a given value.

When to Use

Use this function when you need an effect that completes successfully with a specific value without any errors or external dependencies.

@seefail to create an effect that represents a failure.

@example

// Title: Creating a Successful Effect
import { Effect } from "effect"
// Creating an effect that represents a successful scenario
//
// ┌─── Effect<number, never, never>
// ▼
const success = Effect.succeed(42)

@since2.0.0

succeed
(
import Option

@since2.0.0

@since2.0.0

Option
.
const some: <number>(value: number) => Option.Option<number>

Creates a new Option that wraps the given value.

@paramvalue - The value to wrap.

@since2.0.0

some
(1))

In each case, the Effect always ends with exactly one value. There is no variability; you always get one result.

Now, let’s shift our focus to Stream. A Stream represents a program description that shares similarities with Effect, it requires a context of type R, may signal errors of type E, and yields values of type A. However, the key distinction is that it can yield zero or more values.

Here are the possible scenarios for a Stream:

  • An Empty Stream: It can end up empty, representing a stream with no values.
  • A Single-Element Stream: It can represent a stream with just one value.
  • A Finite Stream of Elements: It can represent a stream with a finite number of values.
  • An Infinite Stream of Elements: It can represent a stream that continues indefinitely, essentially an infinite stream.

Let’s see these scenarios in action:

import {
import Stream
Stream
} from "effect"
// An empty Stream
const
const emptyStream: Stream.Stream<never, never, never>
emptyStream
=
import Stream
Stream
.
const empty: Stream.Stream<never, never, never>

The empty stream.

@example

import { Effect, Stream } from "effect"
const stream = Stream.empty
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }

@since2.0.0

empty
// A Stream with a single number
const
const oneNumberValueStream: Stream.Stream<number, never, never>
oneNumberValueStream
=
import Stream
Stream
.
const succeed: <number>(value: number) => Stream.Stream<number, never, never>

Creates a single-valued pure stream.

@example

import { Effect, Stream } from "effect"
// A Stream with a single number
const stream = Stream.succeed(3)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 3 ] }

@since2.0.0

succeed
(3)
// A Stream with a range of numbers from 1 to 10
const
const finiteNumberStream: Stream.Stream<number, never, never>
finiteNumberStream
=
import Stream
Stream
.
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.

@example

import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

@since2.0.0

range
(1, 10)
// An infinite Stream of numbers starting from 1 and incrementing
const
const infiniteNumberStream: Stream.Stream<number, never, never>
infiniteNumberStream
=
import Stream
Stream
.
const iterate: <number>(value: number, next: (value: number) => number) => Stream.Stream<number, never, never>

The infinite stream of iterative function application: a, f(a), f(f(a)), f(f(f(a))), ...

@example

import { Effect, Stream } from "effect"
// An infinite Stream of numbers starting from 1 and incrementing
const stream = Stream.iterate(1, (n) => n + 1)
// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(10)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }

@since2.0.0

iterate
(1, (
n: number
n
) =>
n: number
n
+ 1)

In summary, a Stream is a versatile tool for representing programs that may yield multiple values, making it suitable for a wide range of tasks, from processing finite lists to handling infinite sequences.