A SubscriptionRef<A> is a specialized form of a SynchronizedRef. It allows us to subscribe and receive updates on the current value and any changes made to that value.
* A stream containing the current value of the `Ref` as well as all changes
* to that value.
*/
readonlychanges:Stream<A>
}
You can perform all standard operations on a SubscriptionRef, such as get, set, or modify to interact with the current value.
The key feature of SubscriptionRef is its changes stream. This stream allows you to observe the current value at the moment of subscription and receive all subsequent changes. Every time the stream is run, it emits the current value and tracks future updates.
To create a SubscriptionRef, you can use the SubscriptionRef.make constructor, specifying the initial value:
Creates a new SubscriptionRef with the specified value.
@since ― 2.0.0
make(0)
SubscriptionRef is particularly useful for modeling shared state when multiple observers need to react to changes. For example, in functional reactive programming, the SubscriptionRef could represent a portion of the application state, and various observers (like UI components) would update in response to state changes.
Example (Server-Client Model with SubscriptionRef)
In the following example, a “server” continually updates a shared value, while multiple “clients” observe the changes:
1
import {
import Ref
Ref,
import Effect
@since ― 2.0.0
@since ― 2.0.0
@since ― 2.0.0
Effect } from"effect"
2
3
// Server function that increments a shared value forever
constforever: <A, E, R>(self:Effect.Effect<A, E, R>) =>Effect.Effect<never, E, R>
Repeats an effect indefinitely until an error occurs.
Details
This function executes an effect repeatedly in an infinite loop. Each
iteration is executed sequentially, and the loop continues until the first
error occurs. If the effect succeeds, it starts over from the beginning. If
the effect fails, the error is propagated, and the loop stops.
Be cautious when using this function, as it will run indefinitely unless an
error interrupts it. This makes it suitable for long-running processes or
continuous polling tasks, but you should ensure proper error handling or
combine it with other operators like timeout or schedule to prevent
unintentional infinite loops.
@since ― 2.0.0
forever)
The server function operates on a regular Ref and continuously updates the value. It doesn’t need to know about SubscriptionRef directly.
Next, let’s define a client that subscribes to changes and collects a specified number of values:
1
import {
import Ref
Ref,
import Effect
@since ― 2.0.0
@since ― 2.0.0
@since ― 2.0.0
Effect,
import Stream
Stream,
import Random
Random } from"effect"
2
3
// Server function that increments a shared value forever
constforever: <A, E, R>(self:Effect.Effect<A, E, R>) =>Effect.Effect<never, E, R>
Repeats an effect indefinitely until an error occurs.
Details
This function executes an effect repeatedly in an infinite loop. Each
iteration is executed sequentially, and the loop continues until the first
error occurs. If the effect succeeds, it starts over from the beginning. If
the effect fails, the error is propagated, and the loop stops.
Be cautious when using this function, as it will run indefinitely unless an
error interrupts it. This makes it suitable for long-running processes or
continuous polling tasks, but you should ensure proper error handling or
combine it with other operators like timeout or schedule to prevent
unintentional infinite loops.
@since ― 2.0.0
forever)
6
7
// Client function that observes the stream of changes
A Stream<A, E, R> is a description of a program that, when evaluated, may
emit zero or more values of type A, may fail with errors of type E, and
uses an context of type R. One way to think of Stream is as a
Effect program that could emit multiple values.
Stream is a purely functional pull based stream. Pull based streams offer
inherent laziness and backpressure, relieving users of the need to manage
buffers between operators. As an optimization, Stream does not emit
single values, but rather an array of values. This allows the cost of effect
evaluation to be amortized.
Stream forms a monad on its A type parameter, and has error management
facilities for its E type parameter, modeled similarly to Effect (with
some adjustments for the multiple-valued nature of Stream). These aspects
allow for rich and expressive composition of streams.
Provides a way to write effectful code using generator functions, simplifying
control flow and error handling.
When to Use
Effect.gen allows you to write code that looks and behaves like synchronous
code, but it can handle asynchronous tasks, errors, and complex control flow
(like loops and conditions). It helps make asynchronous code more readable
and easier to manage.
The generator functions work similarly to async/await but with more
explicit control over the execution of effects. You can yield* values from
effects and return the final result at the end.
Similarly, the client function only works with a Stream of values and doesn’t concern itself with the source of these values.
To tie everything together, we start the server, launch multiple client instances in parallel, and then shut down the server when we’re finished. We also create the SubscriptionRef in this process.
1
import {
2
import Ref
Ref,
3
import Effect
@since ― 2.0.0
@since ― 2.0.0
@since ― 2.0.0
Effect,
4
import Stream
Stream,
5
import Random
Random,
6
import SubscriptionRef
SubscriptionRef,
7
import Fiber
Fiber
8
} from"effect"
9
10
// Server function that increments a shared value forever
constforever: <A, E, R>(self:Effect.Effect<A, E, R>) =>Effect.Effect<never, E, R>
Repeats an effect indefinitely until an error occurs.
Details
This function executes an effect repeatedly in an infinite loop. Each
iteration is executed sequentially, and the loop continues until the first
error occurs. If the effect succeeds, it starts over from the beginning. If
the effect fails, the error is propagated, and the loop stops.
Be cautious when using this function, as it will run indefinitely unless an
error interrupts it. This makes it suitable for long-running processes or
continuous polling tasks, but you should ensure proper error handling or
combine it with other operators like timeout or schedule to prevent
unintentional infinite loops.
@since ― 2.0.0
forever)
13
14
// Client function that observes the stream of changes
A Stream<A, E, R> is a description of a program that, when evaluated, may
emit zero or more values of type A, may fail with errors of type E, and
uses an context of type R. One way to think of Stream is as a
Effect program that could emit multiple values.
Stream is a purely functional pull based stream. Pull based streams offer
inherent laziness and backpressure, relieving users of the need to manage
buffers between operators. As an optimization, Stream does not emit
single values, but rather an array of values. This allows the cost of effect
evaluation to be amortized.
Stream forms a monad on its A type parameter, and has error management
facilities for its E type parameter, modeled similarly to Effect (with
some adjustments for the multiple-valued nature of Stream). These aspects
allow for rich and expressive composition of streams.
Provides a way to write effectful code using generator functions, simplifying
control flow and error handling.
When to Use
Effect.gen allows you to write code that looks and behaves like synchronous
code, but it can handle asynchronous tasks, errors, and complex control flow
(like loops and conditions). It helps make asynchronous code more readable
and easier to manage.
The generator functions work similarly to async/await but with more
explicit control over the execution of effects. You can yield* values from
effects and return the final result at the end.
Provides a way to write effectful code using generator functions, simplifying
control flow and error handling.
When to Use
Effect.gen allows you to write code that looks and behaves like synchronous
code, but it can handle asynchronous tasks, errors, and complex control flow
(like loops and conditions). It helps make asynchronous code more readable
and easier to manage.
The generator functions work similarly to async/await but with more
explicit control over the execution of effects. You can yield* values from
effects and return the final result at the end.
Creates a new fiber to run an effect concurrently.
Details
This function takes an effect and forks it into a separate fiber, allowing it
to run concurrently without blocking the original effect. The new fiber
starts execution immediately after being created, and the fiber object is
returned immediately without waiting for the effect to begin. This is useful
when you want to run tasks concurrently while continuing other tasks in the
parent fiber.
The forked fiber is attached to the parent fiber's scope. This means that
when the parent fiber terminates, the child fiber will also be terminated
automatically. This feature, known as "auto supervision," ensures that no
fibers are left running unintentionally. If you prefer not to have this auto
supervision behavior, you can use
forkDaemon
or
forkIn
.
When to Use
Use this function when you need to run an effect concurrently without
blocking the current execution flow. For example, you might use it to launch
background tasks or concurrent computations. However, working with fibers can
be complex, so before using this function directly, you might want to explore
higher-level functions like
raceWith
,
zip
, or others that can
manage concurrency for you.
@see ― forkWithErrorHandler for a version that allows you to handle errors.
@example
import { Effect } from"effect"
constfib= (n:number):Effect.Effect<number> =>
n <2
? Effect.succeed(n)
: Effect.zipWith(fib(n -1), fib(n -2), (a, b) => a + b)
Calls a defined callback function on each element of an array, and returns an array that contains the results.
@param ― callbackfn A function that accepts up to three arguments. The map method calls the callbackfn function one time for each element in the array.
@param ― thisArg An object to which the this keyword can refer in the callbackfn function. If thisArg is omitted, undefined is used as the this value.
Combines multiple effects into one, returning results based on the input
structure.
Details
Use this function when you need to run multiple effects and combine their
results into a single output. It supports tuples, iterables, structs, and
records, making it flexible for different input types.
For instance, if the input is a tuple:
// ┌─── a tuple of effects
// ▼
Effect.all([effect1, effect2, ...])
the effects are executed sequentially, and the result is a new effect
containing the results as a tuple. The results in the tuple match the order
of the effects passed to Effect.all.
Concurrency
You can control the execution order (e.g., sequential vs. concurrent) using
the concurrency option.
Short-Circuiting Behavior
This function stops execution on the first error it encounters, this is
called "short-circuiting". If any effect in the collection fails, the
remaining effects will not run, and the error will be propagated. To change
this behavior, you can use the mode option, which allows all effects to run
and collect results as Either or Option.
The mode option
The { mode: "either" } option changes the behavior of Effect.all to
ensure all effects run, even if some fail. Instead of stopping on the first
failure, this mode collects both successes and failures, returning an array
of Either instances where each result is either a Right (success) or a
Left (failure).
Similarly, the { mode: "validate" } option uses Option to indicate
success or failure. Each effect returns None for success and Some with
the error for failure.
@see ― forEach for iterating over elements and applying an effect.
@see ― allWith for a data-last version of this function.
Interrupts the fiber from whichever fiber is calling this method. If the
fiber has already exited, the returned effect will resume immediately.
Otherwise, the effect will resume when the fiber exits.
@since ― 2.0.0
interrupt(
constserverFiber:Fiber.RuntimeFiber<never, never>
serverFiber)
37
38
// Output the results collected by each client
39
for (const
constchunk:Chunk<number>
chunkof
constchunks:Chunk<number>[]
chunks) {
40
var console:Console
The console module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
A global console instance configured to write to process.stdout and
process.stderr. The global console can be used without importing the node:console module.
Warning: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the note on process I/O for
more information.
Example using the global console:
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(newError('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
constname='Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
Example using the Console class:
constout=getStreamSomehow();
consterr=getStreamSomehow();
constmyConsole=new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(newError('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
Prints to stdout with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to printf(3)
(the arguments are all passed to util.format()).
Executes an effect and returns the result as a Promise.
Details
This function runs an effect and converts its result into a Promise. If the
effect succeeds, the Promise will resolve with the successful result. If
the effect fails, the Promise will reject with an error, which includes the
failure details of the effect.
The optional options parameter allows you to pass an AbortSignal for
cancellation, enabling more fine-grained control over asynchronous tasks.
When to Use
Use this function when you need to execute an effect and work with its result
in a promise-based system, such as when integrating with third-party
libraries that expect Promise results.
@see ― runPromiseExit for a version that returns an Exit type instead
of rejecting.
@example
// Title: Running a Successful Effect as a Promise
//Example: Handling a Failing Effect as a Rejected Promise
import { Effect } from "effect"
// Effect.runPromise(Effect.fail("my error")).catch(console.error)
// Output:
// (FiberFailure) Error: my error
@since ― 2.0.0
runPromise(
constprogram:Effect.Effect<void, never, never>
program)
45
/*
46
Example Output:
47
{ _id: 'Chunk', values: [ 4, 5, 6, 7, 8, 9 ] }
48
{ _id: 'Chunk', values: [ 4 ] }
49
{ _id: 'Chunk', values: [ 4, 5, 6, 7, 8, 9 ] }
50
{ _id: 'Chunk', values: [ 4, 5 ] }
51
{ _id: 'Chunk', values: [ 4, 5, 6, 7, 8, 9 ] }
52
*/
This setup ensures that each client observes the current value when it starts and receives all subsequent changes to the value.
Since the changes are represented as streams, you can easily build more complex programs using familiar stream operators. You can transform, filter, or merge these streams with other streams to achieve more sophisticated behavior.