A SubscriptionRef<A> is a specialized form of a SynchronizedRef. It allows us to subscribe and receive updates on the current value and any changes made to that value.
You can perform all standard operations on a SubscriptionRef, such as get, set, or modify to interact with the current value.
The key feature of SubscriptionRef is its changes stream. This stream allows you to observe the current value at the moment of subscription and receive all subsequent changes. Every time the stream is run, it emits the current value and tracks future updates.
To create a SubscriptionRef, you can use the SubscriptionRef.make constructor, specifying the initial value:
Example (Creating a SubscriptionRef)
SubscriptionRef is particularly useful for modeling shared state when multiple observers need to react to changes. For example, in functional reactive programming, the SubscriptionRef could represent a portion of the application state, and various observers (like UI components) would update in response to state changes.
Example (Server-Client Model with SubscriptionRef)
In the following example, a “server” continually updates a shared value, while multiple “clients” observe the changes:
1
import {
import Ref
Ref,
import Effect
@since ― 2.0.0
@since ― 2.0.0
@since ― 2.0.0
Effect } from"effect"
2
3
// Server function that increments a shared value forever
A Stream<A, E, R> is a description of a program that, when evaluated, may
emit zero or more values of type A, may fail with errors of type E, and
uses an context of type R. One way to think of Stream is as a
Effect program that could emit multiple values.
Stream is a purely functional pull based stream. Pull based streams offer
inherent laziness and backpressure, relieving users of the need to manage
buffers between operators. As an optimization, Stream does not emit
single values, but rather an array of values. This allows the cost of effect
evaluation to be amortized.
Stream forms a monad on its A type parameter, and has error management
facilities for its E type parameter, modeled similarly to Effect (with
some adjustments for the multiple-valued nature of Stream). These aspects
allow for rich and expressive composition of streams.
Similarly, the client function only works with a Stream of values and doesn’t concern itself with the source of these values.
To tie everything together, we start the server, launch multiple client instances in parallel, and then shut down the server when we’re finished. We also create the SubscriptionRef in this process.
1
import {
2
import Ref
Ref,
3
import Effect
@since ― 2.0.0
@since ― 2.0.0
@since ― 2.0.0
Effect,
4
import Stream
Stream,
5
import Random
Random,
6
import SubscriptionRef
SubscriptionRef,
7
import Fiber
Fiber
8
} from"effect"
9
10
// Server function that increments a shared value forever
A Stream<A, E, R> is a description of a program that, when evaluated, may
emit zero or more values of type A, may fail with errors of type E, and
uses an context of type R. One way to think of Stream is as a
Effect program that could emit multiple values.
Stream is a purely functional pull based stream. Pull based streams offer
inherent laziness and backpressure, relieving users of the need to manage
buffers between operators. As an optimization, Stream does not emit
single values, but rather an array of values. This allows the cost of effect
evaluation to be amortized.
Stream forms a monad on its A type parameter, and has error management
facilities for its E type parameter, modeled similarly to Effect (with
some adjustments for the multiple-valued nature of Stream). These aspects
allow for rich and expressive composition of streams.
Returns an effect that forks this effect into its own separate fiber,
returning the fiber immediately, without waiting for it to begin executing
the effect.
You can use the fork method whenever you want to execute an effect in a
new fiber, concurrently and without "blocking" the fiber executing other
effects. Using fibers can be tricky, so instead of using this method
directly, consider other higher-level methods, such as raceWith,
zipPar, and so forth.
The fiber returned by this method has methods to interrupt the fiber and to
wait for it to finish executing the effect. See Fiber for more
information.
Whenever you use this method to launch a new fiber, the new fiber is
attached to the parent fiber's scope. This means when the parent fiber
terminates, the child fiber will be terminated as well, ensuring that no
fibers leak. This behavior is called "auto supervision", and if this
behavior is not desired, you may use the forkDaemon or forkIn methods.
Calls a defined callback function on each element of an array, and returns an array that contains the results.
@param ― callbackfn A function that accepts up to three arguments. The map method calls the callbackfn function one time for each element in the array.
@param ― thisArg An object to which the this keyword can refer in the callbackfn function. If thisArg is omitted, undefined is used as the this value.
Interrupts the fiber from whichever fiber is calling this method. If the
fiber has already exited, the returned effect will resume immediately.
Otherwise, the effect will resume when the fiber exits.
@since ― 2.0.0
interrupt(
constserverFiber:Fiber.RuntimeFiber<never, never>
serverFiber)
37
38
// Output the results collected by each client
39
for (const
constchunk:Chunk<number>
chunkof
constchunks:Chunk<number>[]
chunks) {
40
var console:Console
The console module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
A global console instance configured to write to process.stdout and
process.stderr. The global console can be used without importing the node:console module.
Warning: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the note on process I/O for
more information.
Example using the global console:
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(newError('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
constname='Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
Example using the Console class:
constout=getStreamSomehow();
consterr=getStreamSomehow();
constmyConsole=new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(newError('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
Prints to stdout with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to printf(3)
(the arguments are all passed to util.format()).
Executes an effect and returns a Promise that resolves with the result.
Use runPromise when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
@example
import { Effect } from "effect"
// Execute an effect and handle the result with a Promise
Effect.runPromise(Effect.succeed(1)).then(console.log) // Output: 1
// Execute a failing effect and handle the rejection
Effect.runPromise(Effect.fail("my error")).catch((error) => {
console.error("Effect failed with error:", error)
})
@since ― 2.0.0
runPromise(
constprogram:Effect.Effect<void, never, never>
program)
45
/*
46
Example Output:
47
{ _id: 'Chunk', values: [ 4, 5, 6, 7, 8, 9 ] }
48
{ _id: 'Chunk', values: [ 4 ] }
49
{ _id: 'Chunk', values: [ 4, 5, 6, 7, 8, 9 ] }
50
{ _id: 'Chunk', values: [ 4, 5 ] }
51
{ _id: 'Chunk', values: [ 4, 5, 6, 7, 8, 9 ] }
52
*/
This setup ensures that each client observes the current value when it starts and receives all subsequent changes to the value.
Since the changes are represented as streams, you can easily build more complex programs using familiar stream operators. You can transform, filter, or merge these streams with other streams to achieve more sophisticated behavior.