PubSub
On this page
In this guide, we'll explore the concept of a PubSub
, which is an asynchronous message hub. It allows publishers to send messages to the pubsub, and subscribers can receive those messages.
Unlike a Queue, where each value offered to the queue can be taken by one taker, each value published to a pubsub can be received by all subscribers.
Whereas a Queue represents the optimal solution to the problem of how to distribute values, a PubSub
represents the optimal solution to the problem of how to broadcast them.
Basic Operations
The core operations of a PubSub
are PubSub.publish
and PubSub.subscribe
:
- The
publish
operation sends a message of typeA
to the pubsub. It returns an effect that indicates whether the message was successfully published. - The
subscribe
operation returns a scoped effect that allows you to subscribe to the pubsub. It automatically unsubscribes when the scope is closed. Within the scope, you gain access to aDequeue
, which is essentially aQueue
for dequeuing messages published to the pubsub.
Let's look at an example to understand how to use a pubsub:
ts
import {Effect ,PubSub ,Queue ,Console } from "effect"constprogram =PubSub .bounded <string>(2).pipe (Effect .andThen ((pubsub ) =>Effect .scoped (Effect .gen (function* () {constdequeue1 = yield*PubSub .subscribe (pubsub )constdequeue2 = yield*PubSub .subscribe (pubsub )yield*PubSub .publish (pubsub , "Hello from a PubSub!")yield*Queue .take (dequeue1 ).pipe (Effect .andThen (Console .log ))yield*Queue .take (dequeue2 ).pipe (Effect .andThen (Console .log ))}))))Effect .runPromise (program )/*Output:Hello from a PubSub!Hello from a PubSub!*/
ts
import {Effect ,PubSub ,Queue ,Console } from "effect"constprogram =PubSub .bounded <string>(2).pipe (Effect .andThen ((pubsub ) =>Effect .scoped (Effect .gen (function* () {constdequeue1 = yield*PubSub .subscribe (pubsub )constdequeue2 = yield*PubSub .subscribe (pubsub )yield*PubSub .publish (pubsub , "Hello from a PubSub!")yield*Queue .take (dequeue1 ).pipe (Effect .andThen (Console .log ))yield*Queue .take (dequeue2 ).pipe (Effect .andThen (Console .log ))}))))Effect .runPromise (program )/*Output:Hello from a PubSub!Hello from a PubSub!*/
It's important to note that a subscriber will only receive messages published to the pubsub while it's subscribed. To ensure that a specific message reaches a subscriber, make sure that the subscription has been established before publishing the message.
Creating PubSubs
You can create a pubsub using various constructors provided by the PubSub module:
Bounded PubSub
A bounded pubsub applies back pressure to publishers when it's at capacity, meaning publishers will block if the pubsub is full.
ts
import {PubSub } from "effect"constboundedPubSub =PubSub .bounded <string>(2)
ts
import {PubSub } from "effect"constboundedPubSub =PubSub .bounded <string>(2)
Back pressure ensures that all subscribers receive all messages while they are subscribed. However, it can lead to slower message delivery if a subscriber is slow.
Dropping PubSub
A dropping pubsub simply discards values if it's full. The PubSub.publish
function will return false
when the pubsub is full.
ts
import {PubSub } from "effect"constdroppingPubSub =PubSub .dropping <string>(2)
ts
import {PubSub } from "effect"constdroppingPubSub =PubSub .dropping <string>(2)
In a dropping pubsub, publishers can continue to publish new values, but subscribers are not guaranteed to receive all messages.
Sliding PubSub
A sliding pubsub drops the oldest value when it's full, ensuring that publishing always succeeds immediately.
ts
import {PubSub } from "effect"constslidingPubSub =PubSub .sliding <string>(2)
ts
import {PubSub } from "effect"constslidingPubSub =PubSub .sliding <string>(2)
A sliding pubsub prevents slow subscribers from impacting the message delivery rate. However, there's still a risk that slow subscribers may miss some messages.
Unbounded PubSub
An unbounded pubsub can never be full, and publishing always succeeds immediately.
ts
import {PubSub } from "effect"constunboundedPubSub =PubSub .unbounded <string>()
ts
import {PubSub } from "effect"constunboundedPubSub =PubSub .unbounded <string>()
Unbounded pubsubs guarantee that all subscribers receive all messages without slowing down message delivery. However, they can grow indefinitely if messages are published faster than they are consumed.
Generally, it's recommended to use bounded, dropping, or sliding pubsubs unless you have specific use cases for unbounded pubsubs.
Operators On PubSubs
PubSubs support various operations similar to those available on queues.
Publishing Multiple Values
You can use the PubSub.publishAll
operator to publish multiple values to the pubsub at once:
ts
import {Effect ,PubSub ,Queue ,Console } from "effect"constprogram =PubSub .bounded <string>(2).pipe (Effect .andThen ((pubsub ) =>Effect .scoped (Effect .gen (function* () {constdequeue = yield*PubSub .subscribe (pubsub )yield*PubSub .publishAll (pubsub , ["Message 1", "Message 2"])yield*Queue .takeAll (dequeue ).pipe (Effect .andThen (Console .log ))}))))Effect .runPromise (program )/*Output:{_id: "Chunk",values: [ "Message 1", "Message 2" ]}*/
ts
import {Effect ,PubSub ,Queue ,Console } from "effect"constprogram =PubSub .bounded <string>(2).pipe (Effect .andThen ((pubsub ) =>Effect .scoped (Effect .gen (function* () {constdequeue = yield*PubSub .subscribe (pubsub )yield*PubSub .publishAll (pubsub , ["Message 1", "Message 2"])yield*Queue .takeAll (dequeue ).pipe (Effect .andThen (Console .log ))}))))Effect .runPromise (program )/*Output:{_id: "Chunk",values: [ "Message 1", "Message 2" ]}*/
Checking Size
You can determine the capacity and current size of the pubsub using PubSub.capacity
and PubSub.size
:
ts
import {Effect ,PubSub ,Console } from "effect"constprogram =PubSub .bounded <number>(2).pipe (Effect .tap ((pubsub ) =>Console .log (`capacity: ${PubSub .capacity (pubsub )}`)),Effect .tap ((pubsub ) =>PubSub .size (pubsub ).pipe (Effect .andThen ((size ) =>Console .log (`size: ${size }`)))))Effect .runPromise (program )/*Output:capacity: 2size: 0*/
ts
import {Effect ,PubSub ,Console } from "effect"constprogram =PubSub .bounded <number>(2).pipe (Effect .tap ((pubsub ) =>Console .log (`capacity: ${PubSub .capacity (pubsub )}`)),Effect .tap ((pubsub ) =>PubSub .size (pubsub ).pipe (Effect .andThen ((size ) =>Console .log (`size: ${size }`)))))Effect .runPromise (program )/*Output:capacity: 2size: 0*/
Note that capacity
returns a number
because the capacity is set at pubsub creation and never changes. In contrast, size
returns an effect that determines the current size of the pubsub since the number of messages in the pubsub can change over time.
Shutting Down a PubSub
You can shut down a pubsub using PubSub.shutdown
, check if it's shut down with PubSub.isShutdown
, or await its shutdown with PubSub.awaitShutdown
. Shutting down a pubsub also shuts down all associated queues, ensuring the proper propagation of the shutdown signal.
PubSub as an Enqueue
As you can see, the operators on PubSub
are identical to the ones on Queue with the exception of PubSub.publish
and PubSub.subscribe
replacing Queue.offer
and Queue.take
. So if you know how to use a Queue
, you already know how to use a PubSub
.
In fact, a PubSub
can be viewed as a Queue
that can only be written to:
ts
interface PubSub<A> extends Queue.Enqueue<A> {}
ts
interface PubSub<A> extends Queue.Enqueue<A> {}
Here, the Enqueue
type represents a queue that can only be enqueued. Enqueuing to the queue publishes a value to the pubsub, and actions like shutting down the queue also shut down the pubsub.
This versatility allows you to use a PubSub
wherever you currently use a Queue
that you only write to.