Basic Concurrency
On this page
Effect is a highly concurrent framework powered by fibers. Fibers are lightweight virtual threads with resource-safe cancellation capabilities, enabling many features in Effect.
In this section, you will learn the basics of fibers and get familiar with some of the powerful high-level operators that utilize fibers.
What Are Virtual Threads?
JavaScript is inherently single-threaded, meaning it executes code in a single sequence of instructions. However, modern JavaScript environments use an event loop to manage asynchronous operations, creating the illusion of multitasking. In this context, virtual threads, or fibers, are logical threads simulated by the Effect runtime. They allow concurrent execution without relying on true multi-threading, which is not natively supported in JavaScript.
Fibers
All effects in Effect are executed by fibers. If you didn't create the fiber yourself, it was created by an operation you're using (if it's concurrent) or by the Effect runtime system.
Even if you write "single-threaded" code with no concurrent operations, there will always be at least one fiber: the "main" fiber that executes your effect.
Effect fibers have a well-defined lifecycle based on the effect they are executing.
Every fiber exits with either a failure or success, depending on whether the effect it is executing fails or succeeds.
Effect fibers have unique identities, local state, and a status (such as done, running, or suspended).
The Fiber Data Type
The Fiber data type in Effect represents a "handle" on the execution of an effect.
The Fiber<A, E>
data type has two type parameters:
- A (Success Type): The type of value the fiber may succeed with.
- E (Failure Type): The type of value the fiber may fail with.
Fibers do not have an R
type parameter because they only execute effects that have already had their requirements provided to them.
Forking Effects
One of the fundamental ways to create a fiber is by forking an existing effect. When you fork an effect, it starts executing the effect on a new fiber, giving you a reference to this newly-created fiber.
The following code demonstrates how to create a single fiber using the Effect.fork
function.
This fiber will execute the function fib(10)
independently of the main fiber:
ts
import {Effect } from "effect"constfib = (n : number):Effect .Effect <number> =>Effect .suspend (() => {if (n <= 1) {returnEffect .succeed (n )}returnfib (n - 1).pipe (Effect .zipWith (fib (n - 2), (a ,b ) =>a +b ))})constfib10Fiber =Effect .fork (fib (10))
ts
import {Effect } from "effect"constfib = (n : number):Effect .Effect <number> =>Effect .suspend (() => {if (n <= 1) {returnEffect .succeed (n )}returnfib (n - 1).pipe (Effect .zipWith (fib (n - 2), (a ,b ) =>a +b ))})constfib10Fiber =Effect .fork (fib (10))
Joining Fibers
A common operation with fibers is joining them using the Fiber.join
function. This function returns an Effect
that will succeed or fail based on the outcome of the fiber it joins:
ts
import {Effect ,Fiber } from "effect"constfib = (n : number):Effect .Effect <number> =>Effect .suspend (() => {if (n <= 1) {returnEffect .succeed (n )}returnfib (n - 1).pipe (Effect .zipWith (fib (n - 2), (a ,b ) =>a +b ))})constfib10Fiber =Effect .fork (fib (10))constprogram =Effect .gen (function* () {constfiber = yield*fib10Fiber constn = yield*Fiber .join (fiber )console .log (n )})Effect .runPromise (program ) // 55
ts
import {Effect ,Fiber } from "effect"constfib = (n : number):Effect .Effect <number> =>Effect .suspend (() => {if (n <= 1) {returnEffect .succeed (n )}returnfib (n - 1).pipe (Effect .zipWith (fib (n - 2), (a ,b ) =>a +b ))})constfib10Fiber =Effect .fork (fib (10))constprogram =Effect .gen (function* () {constfiber = yield*fib10Fiber constn = yield*Fiber .join (fiber )console .log (n )})Effect .runPromise (program ) // 55
Awaiting Fibers
Another useful function for fibers is Fiber.await
. This function returns an effect containing an Exit value, which provides detailed information about how the fiber completed.
ts
import {Effect ,Fiber } from "effect"constfib = (n : number):Effect .Effect <number> =>Effect .suspend (() => {if (n <= 1) {returnEffect .succeed (n )}returnfib (n - 1).pipe (Effect .zipWith (fib (n - 2), (a ,b ) =>a +b ))})constfib10Fiber =Effect .fork (fib (10))constprogram =Effect .gen (function* () {constfiber = yield*fib10Fiber constexit = yield*Fiber .await (fiber )console .log (exit )})Effect .runPromise (program ) // { _id: 'Exit', _tag: 'Success', value: 55 }
ts
import {Effect ,Fiber } from "effect"constfib = (n : number):Effect .Effect <number> =>Effect .suspend (() => {if (n <= 1) {returnEffect .succeed (n )}returnfib (n - 1).pipe (Effect .zipWith (fib (n - 2), (a ,b ) =>a +b ))})constfib10Fiber =Effect .fork (fib (10))constprogram =Effect .gen (function* () {constfiber = yield*fib10Fiber constexit = yield*Fiber .await (fiber )console .log (exit )})Effect .runPromise (program ) // { _id: 'Exit', _tag: 'Success', value: 55 }
Interrupting Fibers
If a fiber's result is no longer needed, it can be interrupted, which immediately terminates the fiber and safely releases all resources by running all finalizers.
Similar to Fiber.await
, Fiber.interrupt
returns an Exit` value describing how the fiber completed.
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber = yield*Effect .fork (Effect .forever (Effect .succeed ("Hi!")))constexit = yield*Fiber .interrupt (fiber )console .log (exit )})Effect .runPromise (program )/*Output{_id: 'Exit',_tag: 'Failure',cause: {_id: 'Cause',_tag: 'Interrupt',fiberId: {_id: 'FiberId',_tag: 'Runtime',id: 0,startTimeMillis: 1715787137490}}}*/
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber = yield*Effect .fork (Effect .forever (Effect .succeed ("Hi!")))constexit = yield*Fiber .interrupt (fiber )console .log (exit )})Effect .runPromise (program )/*Output{_id: 'Exit',_tag: 'Failure',cause: {_id: 'Cause',_tag: 'Interrupt',fiberId: {_id: 'FiberId',_tag: 'Runtime',id: 0,startTimeMillis: 1715787137490}}}*/
By design, the effect returned by Fiber.interrupt
does not resume until the fiber has completed, ensuring that your code does not start new fibers until the old one has terminated. This behavior, often called "back-pressuring," can be overridden if needed.
If you do not need back-pressuring, you can fork the interruption itself into a new fiber:
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber = yield*Effect .fork (Effect .forever (Effect .succeed ("Hi!")))const_ = yield*Effect .fork (Fiber .interrupt (fiber ))})
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber = yield*Effect .fork (Effect .forever (Effect .succeed ("Hi!")))const_ = yield*Effect .fork (Fiber .interrupt (fiber ))})
There is also a shorthand for background interruption called Fiber.interruptFork
.
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber = yield*Effect .fork (Effect .forever (Effect .succeed ("Hi!")))const_ = yield*Fiber .interruptFork (fiber )})
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber = yield*Effect .fork (Effect .forever (Effect .succeed ("Hi!")))const_ = yield*Fiber .interruptFork (fiber )})
Note: It is also possible to perform interruptions using the high-level API Effect.interrupt
. For more information, see Effect.interrupt.
Composing Fibers
The Fiber.zip
and Fiber.zipWith
functions allow you to combine two fibers into a single fiber. The resulting fiber produces the results of both input fibers. If either of the input fibers fails, the composed fiber will also fail.
Here's an example using Fiber.zip
:
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber1 = yield*Effect .fork (Effect .succeed ("Hi!"))constfiber2 = yield*Effect .fork (Effect .succeed ("Bye!"))constfiber =Fiber .zip (fiber1 ,fiber2 )consttuple = yield*Fiber .join (fiber )console .log (tuple )})Effect .runPromise (program )/*Output:[ 'Hi!', 'Bye!' ]*/
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber1 = yield*Effect .fork (Effect .succeed ("Hi!"))constfiber2 = yield*Effect .fork (Effect .succeed ("Bye!"))constfiber =Fiber .zip (fiber1 ,fiber2 )consttuple = yield*Fiber .join (fiber )console .log (tuple )})Effect .runPromise (program )/*Output:[ 'Hi!', 'Bye!' ]*/
Another way to compose fibers is with the Fiber.orElse
function. This function allows you to specify an alternative fiber that will be executed if the first fiber fails. If the first fiber succeeds, the composed fiber will return its result. If the first fiber fails, the composed fiber will complete with the result of the second fiber, regardless of whether it succeeds or fails.
Here's an example using Fiber.orElse
:
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber1 = yield*Effect .fork (Effect .fail ("Uh oh!"))constfiber2 = yield*Effect .fork (Effect .succeed ("Hurray!"))constfiber =Fiber .orElse (fiber1 ,fiber2 )constmessage = yield*Fiber .join (fiber )console .log (message )})Effect .runPromise (program )/*Output:Hurray!*/
ts
import {Effect ,Fiber } from "effect"constprogram =Effect .gen (function* () {constfiber1 = yield*Effect .fork (Effect .fail ("Uh oh!"))constfiber2 = yield*Effect .fork (Effect .succeed ("Hurray!"))constfiber =Fiber .orElse (fiber1 ,fiber2 )constmessage = yield*Fiber .join (fiber )console .log (message )})Effect .runPromise (program )/*Output:Hurray!*/
Concurrency Options
Effect provides many functions that accept Concurrency Options to help you identify opportunities to parallelize your code.
For example, the standard Effect.zip
function combines two effects sequentially. However, there is also a concurrent version, Effect.zip({_, _, { concurrent: true })
, which combines two effects concurrently.
In the following example, we use Effect.zip
to run two tasks sequentially. The first task takes 1 second, and the second task takes 2 seconds, resulting in a total duration of approximately 3 seconds:
ts
import {Effect ,Console } from "effect"consttask1 =Effect .delay (Console .log ("task1"), "1 second")consttask2 =Effect .delay (Console .log ("task2"), "2 seconds")constprogram =Effect .zip (task1 ,task2 )Effect .runPromise (Effect .timed (program )).then (([duration ]) =>console .log (String (duration )))/*Output:task1task2Duration(3s 5ms 369875ns)*/
ts
import {Effect ,Console } from "effect"consttask1 =Effect .delay (Console .log ("task1"), "1 second")consttask2 =Effect .delay (Console .log ("task2"), "2 seconds")constprogram =Effect .zip (task1 ,task2 )Effect .runPromise (Effect .timed (program )).then (([duration ]) =>console .log (String (duration )))/*Output:task1task2Duration(3s 5ms 369875ns)*/
In this example, we use the concurrent version of Effect.zip
to run two tasks concurrently. The total duration will be approximately equal to the duration of the longest task, which is 2 seconds:
ts
import {Effect ,Console } from "effect"consttask1 =Effect .delay (Console .log ("task1"), "1 second")consttask2 =Effect .delay (Console .log ("task2"), "2 seconds")constprogram =Effect .zip (task1 ,task2 , {concurrent : true })Effect .runPromise (Effect .timed (program )).then (([duration ]) =>console .log (String (duration )))/*Output:task1task2Duration(2s 8ms 179666ns)*/
ts
import {Effect ,Console } from "effect"consttask1 =Effect .delay (Console .log ("task1"), "1 second")consttask2 =Effect .delay (Console .log ("task2"), "2 seconds")constprogram =Effect .zip (task1 ,task2 , {concurrent : true })Effect .runPromise (Effect .timed (program )).then (([duration ]) =>console .log (String (duration )))/*Output:task1task2Duration(2s 8ms 179666ns)*/
Racing
The Effect.race
function lets you race multiple effects concurrently and returns the result of the first one that successfully completes. Here's an example:
ts
import {Effect } from "effect"consttask1 =Effect .delay (Effect .fail ("task1"), "1 second")consttask2 =Effect .delay (Effect .succeed ("task2"), "2 seconds")constprogram =Effect .race (task1 ,task2 )Effect .runPromise (program ).then (console .log )/*Output:task2*/
ts
import {Effect } from "effect"consttask1 =Effect .delay (Effect .fail ("task1"), "1 second")consttask2 =Effect .delay (Effect .succeed ("task2"), "2 seconds")constprogram =Effect .race (task1 ,task2 )Effect .runPromise (program ).then (console .log )/*Output:task2*/
In this example, task1
is set to fail after 1 second, while task2
is set to succeed after 2 seconds. The Effect.race
function runs both tasks concurrently, and since task2
is the first to succeed, its result is returned.
If you need to handle the first effect to complete, whether it succeeds or fails, you can use the Effect.either
function. This function wraps the result in an Either type, allowing you to see if the outcome was a success (Right
) or a failure (Left
):
ts
import {Effect } from "effect"consttask1 =Effect .delay (Effect .fail ("task1"), "1 second")consttask2 =Effect .delay (Effect .succeed ("task2"), "2 seconds")constprogram =Effect .race (Effect .either (task1 ),Effect .either (task2 ))Effect .runPromise (program ).then (console .log )/*Output:{ _id: 'Either', _tag: 'Left', left: 'task1' }*/
ts
import {Effect } from "effect"consttask1 =Effect .delay (Effect .fail ("task1"), "1 second")consttask2 =Effect .delay (Effect .succeed ("task2"), "2 seconds")constprogram =Effect .race (Effect .either (task1 ),Effect .either (task2 ))Effect .runPromise (program ).then (console .log )/*Output:{ _id: 'Either', _tag: 'Left', left: 'task1' }*/
In this example, task1
fails after 1 second, and task2
succeeds after 2 seconds. By using Effect.either
, the program returns the result of task1
, showing that it was a failure (Left
).
Timeout
When working with asynchronous tasks, it's often important to ensure that they complete within a reasonable time.
Effect provides a convenient way to enforce time limits on effects using the Effect.timeout
function.
This function returns a new effect that will fail with a TimeoutException
if the original effect does not complete within the specified duration.
Here's an example demonstrating how to use Effect.timeout
:
ts
import {Effect } from "effect"consttask =Effect .delay (Effect .succeed ("task1"), "10 seconds")constprogram =Effect .timeout (task , "2 seconds")Effect .runPromise (program )/*throws:TimeoutException*/
ts
import {Effect } from "effect"consttask =Effect .delay (Effect .succeed ("task1"), "10 seconds")constprogram =Effect .timeout (task , "2 seconds")Effect .runPromise (program )/*throws:TimeoutException*/
In this example, task
is an effect that succeeds after 10 seconds. By wrapping task
with Effect.timeout
and specifying a timeout of 2 seconds, the resulting program will fail with a TimeoutException
because the task takes longer than the allowed time.
If an effect times out, the effect
library automatically interrupts it to prevent it from continuing to execute in the background. This interruption ensures efficient use of resources by stopping unnecessary work.