A PubSub serves as an asynchronous message hub, allowing publishers to send messages that can be received by all current subscribers.
Unlike a Queue, where each value is delivered to only one consumer, a PubSub broadcasts each published message to all subscribers. This makes PubSub ideal for scenarios requiring message broadcasting rather than load distribution.
Basic Operations
A PubSub<A> stores messages of type A and provides two fundamental operations:
API
Description
PubSub.publish
Sends a message of type A to the PubSub, returning an effect indicating if the message was successfully published.
PubSub.subscribe
Creates a scoped effect that allows subscription to the PubSub, automatically unsubscribing when the scope ends. Subscribers receive messages through a Dequeue which holds published messages.
Example (Publishing a Message to Multiple Subscribers)
Creating a PubSub
Bounded PubSub
A bounded PubSub applies back pressure to publishers when it reaches capacity, suspending additional publishing until space becomes available.
Back pressure ensures that all subscribers receive all messages while they are subscribed. However, it can lead to slower message delivery if a subscriber is slow.
Creates a bounded PubSub with the back pressure strategy. The PubSub will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the PubSub is at capacity.
For best performance use capacities that are powers of two.
@since ― 2.0.0
bounded<string>(2)
Dropping PubSub
A dropping PubSub discards new values when full. The PubSub.publish operation returns false if the message is dropped.
In a dropping pubsub, publishers can continue to publish new values, but subscribers are not guaranteed to receive all messages.
Creates a bounded PubSub with the dropping strategy. The PubSub will drop new
messages if the PubSub is at capacity.
For best performance use capacities that are powers of two.
@since ― 2.0.0
dropping<string>(2)
Sliding PubSub
A sliding PubSub removes the oldest message to make space for new ones, ensuring that publishing never blocks.
A sliding pubsub prevents slow subscribers from impacting the message delivery rate. However, there’s still a risk that slow subscribers may miss some messages.
Creates a bounded PubSub with the sliding strategy. The PubSub will add new
messages and drop old messages if the PubSub is at capacity.
For best performance use capacities that are powers of two.
@since ― 2.0.0
sliding<string>(2)
Unbounded PubSub
An unbounded PubSub has no capacity limit, so publishing always succeeds immediately.
Unbounded pubsubs guarantee that all subscribers receive all messages without slowing down message delivery. However, they can grow indefinitely if messages are published faster than they are consumed.
Generally, it’s recommended to use bounded, dropping, or sliding pubsubs unless you have specific use cases for unbounded pubsubs.
Example
1
import {
import PubSub
PubSub } from"effect"
2
3
// Creates an unbounded PubSub with unlimited capacity
Scopes all resources used in this workflow to the lifetime of the workflow,
ensuring that their finalizers are run as soon as this workflow completes
execution, whether by success, failure, or interruption.
Provides a way to write effectful code using generator functions, simplifying
control flow and error handling.
When to Use
gen allows you to write code that looks and behaves like synchronous
code, but it can handle asynchronous tasks, errors, and complex control flow
(like loops and conditions). It helps make asynchronous code more readable
and easier to manage.
The generator functions work similarly to async/await but with more
explicit control over the execution of effects. You can yield* values from
effects and return the final result at the end.
Creates a bounded PubSub with the back pressure strategy. The PubSub will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the PubSub is at capacity.
For best performance use capacities that are powers of two.
Subscribes to receive messages from the PubSub. The resulting subscription can
be evaluated multiple times within the scope to take a message from the PubSub
each time.
Publishes all of the specified messages to the PubSub, returning whether they
were published to the PubSub.
@since ― 2.0.0
publishAll(
constpubsub:PubSub.PubSub<string>
pubsub, ["Message 1", "Message 2"])
8
var console:Console
The console module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
A global console instance configured to write to process.stdout and
process.stderr. The global console can be used without importing the node:console module.
Warning: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the note on process I/O for
more information.
Example using the global console:
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(newError('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
constname='Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
Example using the Console class:
constout=getStreamSomehow();
consterr=getStreamSomehow();
constmyConsole=new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(newError('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
Prints to stdout with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to printf(3)
(the arguments are all passed to util.format()).
The foundational function for running effects, returning a "fiber" that can
be observed or interrupted.
When to Use
runFork is used to run an effect in the background by creating a
fiber. It is the base function for all other run functions. It starts a fiber
that can be observed or interrupted.
Unless you specifically need a Promise or synchronous operation,
runFork is a good default choice.
You can check the capacity and current size of a pubsub using PubSub.capacity and PubSub.size, respectively.
Note that PubSub.capacity returns a number because the capacity is set at pubsub creation and never changes.
In contrast, PubSub.size returns an effect that determines the current size of the pubsub since the number of messages in the pubsub can change over time.
Provides a way to write effectful code using generator functions, simplifying
control flow and error handling.
When to Use
gen allows you to write code that looks and behaves like synchronous
code, but it can handle asynchronous tasks, errors, and complex control flow
(like loops and conditions). It helps make asynchronous code more readable
and easier to manage.
The generator functions work similarly to async/await but with more
explicit control over the execution of effects. You can yield* values from
effects and return the final result at the end.
Creates a bounded PubSub with the back pressure strategy. The PubSub will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the PubSub is at capacity.
For best performance use capacities that are powers of two.
@since ― 2.0.0
bounded<number>(2)
5
var console:Console
The console module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
A global console instance configured to write to process.stdout and
process.stderr. The global console can be used without importing the node:console module.
Warning: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the note on process I/O for
more information.
Example using the global console:
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(newError('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
constname='Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
Example using the Console class:
constout=getStreamSomehow();
consterr=getStreamSomehow();
constmyConsole=new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(newError('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
Prints to stdout with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to printf(3)
(the arguments are all passed to util.format()).
Returns the number of elements the queue can hold.
@since ― 2.0.0
capacity(
constpubsub:PubSub.PubSub<number>
pubsub)}`)
6
var console:Console
The console module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
A global console instance configured to write to process.stdout and
process.stderr. The global console can be used without importing the node:console module.
Warning: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the note on process I/O for
more information.
Example using the global console:
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(newError('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
constname='Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
Example using the Console class:
constout=getStreamSomehow();
consterr=getStreamSomehow();
constmyConsole=new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(newError('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
Prints to stdout with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to printf(3)
(the arguments are all passed to util.format()).
Retrieves the size of the queue, which is equal to the number of elements
in the queue. This may be negative if fibers are suspended waiting for
elements to be added to the queue.
The foundational function for running effects, returning a "fiber" that can
be observed or interrupted.
When to Use
runFork is used to run an effect in the background by creating a
fiber. It is the base function for all other run functions. It starts a fiber
that can be observed or interrupted.
Unless you specifically need a Promise or synchronous operation,
runFork is a good default choice.
To shut down a pubsub, use PubSub.shutdown. You can also verify if it has been shut down with PubSub.isShutdown, or wait for the shutdown to complete with PubSub.awaitShutdown. Shutting down a pubsub also terminates all associated queues, ensuring that the shutdown signal is effectively communicated.
PubSub as an Enqueue
PubSub operators mirror those of Queue with the main difference being that PubSub.publish and PubSub.subscribe are used in place of Queue.offer and Queue.take. If you’re already familiar with using a Queue, you’ll find PubSub straightforward.
Essentially, a PubSub can be seen as a Enqueue that only allows writes:
importtype {
import Queue
Queue } from"effect"
interface
interfacePubSub<A>
PubSub<
function (typeparameter) AinPubSub<A>
A> extends
import Queue
Queue.
interfaceEnqueue<inA>
@since ― 2.0.0
Enqueue<
function (typeparameter) AinPubSub<A>
A> {}
Here, the Enqueue type refers to a queue that only accepts enqueues (or writes). Any value enqueued here is published to the pubsub, and operations like shutdown will also affect the pubsub.
This design makes PubSub highly flexible, letting you use it anywhere you need a Enqueue that only accepts published values.