SubscriptionRef

A SubscriptionRef<A> is a specialized form of a SynchronizedRef. It allows us to subscribe and receive updates on the current value and any changes made to that value.

ts
export interface SubscriptionRef<A> extends Synchronized.SynchronizedRef<A> {
/**
* A stream containing the current value of the `Ref` as well as all changes
* to that value.
*/
readonly changes: Stream<A>
}
ts
export interface SubscriptionRef<A> extends Synchronized.SynchronizedRef<A> {
/**
* A stream containing the current value of the `Ref` as well as all changes
* to that value.
*/
readonly changes: Stream<A>
}

You can perform all the usual operations on a SubscriptionRef, such as get, set, or modify to work with the current value.

The changes stream is where the magic happens. It lets you observe the current value and all subsequent changes. Each time you run this stream, you'll get the current value as of that moment and any changes that occurred afterward.

To create a SubscriptionRef, you can use the make constructor, specifying the initial value:

ts
import { SubscriptionRef } from "effect"
 
const ref = SubscriptionRef.make(0)
ts
import { SubscriptionRef } from "effect"
 
const ref = SubscriptionRef.make(0)

A SubscriptionRef can be invaluable when modeling shared state, especially when multiple observers need to react to every change in that shared state. For example, in a functional reactive programming context, the SubscriptionRef value might represent a part of the application state, and each observer could update various user interface elements based on changes to that state.

To see how this works, let's create a simple example where a "server" repeatedly updates a value observed by multiple "clients":

ts
import { Ref, Effect } from "effect"
 
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever)
ts
import { Ref, Effect } from "effect"
 
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever)

Note that the server function operates on a regular Ref and doesn't need to know about SubscriptionRef. It simply updates a value.

ts
import { Ref, Effect, Stream, Random } from "effect"
 
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever)
 
const client = (changes: Stream.Stream<number>) =>
Effect.gen(function* () {
const n = yield* Random.nextIntBetween(1, 10)
const chunk = yield* Stream.runCollect(Stream.take(changes, n))
return chunk
})
ts
import { Ref, Effect, Stream, Random } from "effect"
 
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever)
 
const client = (changes: Stream.Stream<number>) =>
Effect.gen(function* () {
const n = yield* Random.nextIntBetween(1, 10)
const chunk = yield* Stream.runCollect(Stream.take(changes, n))
return chunk
})

Similarly, the client function only works with a Stream of values and doesn't concern itself with the source of these values.

To tie everything together, we start the server, launch multiple client instances in parallel, and then shut down the server when we're finished. We also create the SubscriptionRef in this process.

ts
import { Ref, Effect, Stream, Random, SubscriptionRef, Fiber } from "effect"
 
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever)
 
const client = (changes: Stream.Stream<number>) =>
Effect.gen(function* () {
const n = yield* Random.nextIntBetween(1, 10)
const chunk = yield* Stream.runCollect(Stream.take(changes, n))
return chunk
})
 
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
const serverFiber = yield* Effect.fork(server(ref))
const clients = new Array(5).fill(null).map(() => client(ref.changes))
const chunks = yield* Effect.all(clients, { concurrency: "unbounded" })
yield* Fiber.interrupt(serverFiber)
for (const chunk of chunks) {
console.log(chunk)
}
})
 
Effect.runPromise(program)
/*
Output:
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
{
_id: "Chunk",
values: [ 2 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4, 5, 6, 7 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4, 5, 6, 7, 8, 9 ]
}
*/
ts
import { Ref, Effect, Stream, Random, SubscriptionRef, Fiber } from "effect"
 
const server = (ref: Ref.Ref<number>) =>
Ref.update(ref, (n) => n + 1).pipe(Effect.forever)
 
const client = (changes: Stream.Stream<number>) =>
Effect.gen(function* () {
const n = yield* Random.nextIntBetween(1, 10)
const chunk = yield* Stream.runCollect(Stream.take(changes, n))
return chunk
})
 
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
const serverFiber = yield* Effect.fork(server(ref))
const clients = new Array(5).fill(null).map(() => client(ref.changes))
const chunks = yield* Effect.all(clients, { concurrency: "unbounded" })
yield* Fiber.interrupt(serverFiber)
for (const chunk of chunks) {
console.log(chunk)
}
})
 
Effect.runPromise(program)
/*
Output:
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
{
_id: "Chunk",
values: [ 2 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4, 5, 6, 7 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
{
_id: "Chunk",
values: [ 2, 3, 4, 5, 6, 7, 8, 9 ]
}
*/

This setup ensures that each client observes the current value when it starts and receives all subsequent changes to the value.

Since the changes are represented as streams, you can easily build more complex programs using familiar stream operators. You can transform, filter, or merge these streams with other streams to achieve more sophisticated behavior.