Introduction to Effect's Interruption Model
On this page
Handling Fiber Interruption
While developing concurrent applications, there are several cases that we need to interrupt the execution of other fibers, for example:
-
A parent fiber might start some child fibers to perform a task, and later the parent might decide that, it doesn't need the result of some or all of the child fibers.
-
Two or more fibers start race with each other. The fiber whose result is computed first wins, and all other fibers are no longer needed, and should be interrupted.
-
In interactive applications, a user may want to stop some already running tasks, such as clicking on the "stop" button to prevent downloading more files.
-
Computations that run longer than expected should be aborted by using timeout operations.
-
When we have an application that perform compute-intensive tasks based on the user inputs, if the user changes the input we should cancel the current task and perform another one.
Polling vs. Asynchronous Interruption
When it comes to interrupting fibers, a naive approach is to allow one fiber to forcefully terminate another fiber. However, this approach is not ideal because it can leave shared state in an inconsistent and unreliable state if the target fiber is in the middle of modifying that state. Therefore, it does not guarantee internal consistency of the shared mutable state.
Instead, there are two popular and valid solutions to tackle this problem:
-
Semi-asynchronous Interruption (Polling for Interruption): Imperative languages often employ polling as a semi-asynchronous signaling mechanism, such as Java. In this model, a fiber sends an interruption request to another fiber. The target fiber continuously polls the interrupt status and checks whether it has received any interruption requests from other fibers. If an interruption request is detected, the target fiber terminates itself as soon as possible.
With this solution, the fiber itself handles critical sections. So, if a fiber is in the middle of a critical section and receives an interruption request, it ignores the interruption and defers its handling until after the critical section.
However, one drawback of this approach is that if the programmer forgets to poll regularly, the target fiber can become unresponsive, leading to deadlocks. Additionally, polling a global flag is not aligned with the functional paradigm followed by Effect.
-
Asynchronous Interruption: In asynchronous interruption, a fiber is allowed to terminate another fiber. The target fiber is not responsible for polling the interrupt status. Instead, during critical sections, the target fiber disables the interruptibility of those regions. This is a purely functional solution that doesn't require polling a global state. Effect adopts this solution for its interruption model, which is a fully asynchronous signaling mechanism.
This mechanism overcomes the drawback of forgetting to poll regularly. It is also fully compatible with the functional paradigm because in a purely functional computation, we can abort the computation at any point, except during critical sections where interruption is disabled.
When Does a Fiber Get Interrupted?
There are several ways and situations in which fibers can be interrupted. Let's explore each one and provide examples to illustrate how to reproduce these scenarios.
Calling Effect.interrupt
A fiber can be interrupted by invoking the Effect.interrupt
operator on that particular fiber.
Without interruptions
ts
import {Effect } from "effect"constprogram =Effect .gen (function* () {yield*Effect .log ("start")yield*Effect .sleep ("2 seconds")yield*Effect .log ("done")})Effect .runPromise (program ).catch ((error ) =>console .log (`interrupted: ${error }`))/*Output:timestamp=... level=INFO fiber=#0 message=starttimestamp=... level=INFO fiber=#0 message=done*/
ts
import {Effect } from "effect"constprogram =Effect .gen (function* () {yield*Effect .log ("start")yield*Effect .sleep ("2 seconds")yield*Effect .log ("done")})Effect .runPromise (program ).catch ((error ) =>console .log (`interrupted: ${error }`))/*Output:timestamp=... level=INFO fiber=#0 message=starttimestamp=... level=INFO fiber=#0 message=done*/
With interruptions
ts
import {Effect } from "effect"constprogram =Effect .gen (function* () {yield*Effect .log ("start")yield*Effect .sleep ("2 seconds")yield*Effect .interrupt yield*Effect .log ("done")})Effect .runPromiseExit (program ).then (console .log )/*Output:timestamp=... level=INFO fiber=#0 message=start{_id: 'Exit',_tag: 'Failure',cause: {_id: 'Cause',_tag: 'Interrupt',fiberId: {_id: 'FiberId',_tag: 'Runtime',id: 0,startTimeMillis: ...}}}*/
ts
import {Effect } from "effect"constprogram =Effect .gen (function* () {yield*Effect .log ("start")yield*Effect .sleep ("2 seconds")yield*Effect .interrupt yield*Effect .log ("done")})Effect .runPromiseExit (program ).then (console .log )/*Output:timestamp=... level=INFO fiber=#0 message=start{_id: 'Exit',_tag: 'Failure',cause: {_id: 'Cause',_tag: 'Interrupt',fiberId: {_id: 'FiberId',_tag: 'Runtime',id: 0,startTimeMillis: ...}}}*/
Interruption of Concurrent Effects
When we combine multiple concurrent effects using functions like Effect.forEach
, it's important to note that if one of the effects is interrupted, all the other concurrent effects will also be interrupted. Let's take a look at an example:
ts
import {Effect } from "effect"constprogram =Effect .forEach ([1, 2, 3],(n ) =>Effect .gen (function* () {yield*Effect .log (`start #${n }`)yield*Effect .sleep (`${n } seconds`)if (n > 1) {yield*Effect .interrupt }yield*Effect .log (`done #${n }`)}),{concurrency : "unbounded" })Effect .runPromiseExit (program ).then ((exit ) =>console .log (JSON .stringify (exit , null, 2)))/*Output:timestamp=... level=INFO fiber=#1 message="start #1"timestamp=... level=INFO fiber=#2 message="start #2"timestamp=... level=INFO fiber=#3 message="start #3"timestamp=... level=INFO fiber=#1 message="done #1"{"_id": "Exit","_tag": "Failure","cause": {"_id": "Cause","_tag": "Parallel","left": {"_id": "Cause","_tag": "Interrupt","fiberId": {"_id": "FiberId","_tag": "Runtime","id": 3,"startTimeMillis": ...}},"right": {"_id": "Cause","_tag": "Sequential","left": {"_id": "Cause","_tag": "Empty"},"right": {"_id": "Cause","_tag": "Interrupt","fiberId": {"_id": "FiberId","_tag": "Runtime","id": 0,"startTimeMillis": ...}}}}}*/
ts
import {Effect } from "effect"constprogram =Effect .forEach ([1, 2, 3],(n ) =>Effect .gen (function* () {yield*Effect .log (`start #${n }`)yield*Effect .sleep (`${n } seconds`)if (n > 1) {yield*Effect .interrupt }yield*Effect .log (`done #${n }`)}),{concurrency : "unbounded" })Effect .runPromiseExit (program ).then ((exit ) =>console .log (JSON .stringify (exit , null, 2)))/*Output:timestamp=... level=INFO fiber=#1 message="start #1"timestamp=... level=INFO fiber=#2 message="start #2"timestamp=... level=INFO fiber=#3 message="start #3"timestamp=... level=INFO fiber=#1 message="done #1"{"_id": "Exit","_tag": "Failure","cause": {"_id": "Cause","_tag": "Parallel","left": {"_id": "Cause","_tag": "Interrupt","fiberId": {"_id": "FiberId","_tag": "Runtime","id": 3,"startTimeMillis": ...}},"right": {"_id": "Cause","_tag": "Sequential","left": {"_id": "Cause","_tag": "Empty"},"right": {"_id": "Cause","_tag": "Interrupt","fiberId": {"_id": "FiberId","_tag": "Runtime","id": 0,"startTimeMillis": ...}}}}}*/
In this example, we have an array [1, 2, 3]
representing three concurrent tasks. We use Effect.forEach
to iterate over each element and perform some operations. The Effect.log
function is used to log messages indicating the start and completion of each task.
Looking at the output, we can see that the task with n = 1
starts and completes successfully. However, the task with n = 2
is interrupted using Effect.interrupt
before it finishes. As a result, all the fibers are interrupted, and the program terminates with the message "All fibers interrupted without errors."
This example demonstrates how interruption works with concurrent effects. If one of the concurrent tasks is interrupted, it triggers the interruption of all the other concurrent tasks as well.