Queue

On this page

A Queue is a lightweight in-memory queue built on Effect with composable and transparent back-pressure. It is fully asynchronous (no locks or blocking), purely-functional and type-safe.

Basic Operations

A Queue<A> stores values of type A and provides two fundamental operations:

  • Queue.offer: This operation adds a value of type A to the Queue.
  • Queue.take: It removes and returns the oldest value from the Queue.

Here's an example demonstrating these basic operations:

ts
import { Effect, Queue } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 1) // Add 1 to the queue
const value = yield* Queue.take(queue) // Retrieve and remove the oldest value
return value
})
 
Effect.runPromise(program).then(console.log) // Output: 1
ts
import { Effect, Queue } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 1) // Add 1 to the queue
const value = yield* Queue.take(queue) // Retrieve and remove the oldest value
return value
})
 
Effect.runPromise(program).then(console.log) // Output: 1

Creating a Queue

A Queue can have bounded (limited capacity) or unbounded storage. Depending on your requirements, you can choose from various strategies to handle new values when the queue reaches its capacity.

Bounded Queue

A bounded queue provides back-pressure when it's full. This means that if the queue is full, any attempt to add more items will be suspended until there's space available.

ts
import { Queue } from "effect"
 
// Creating a bounded queue with a capacity of 100
const boundedQueue = Queue.bounded<number>(100)
ts
import { Queue } from "effect"
 
// Creating a bounded queue with a capacity of 100
const boundedQueue = Queue.bounded<number>(100)

Dropping Queue

A dropping queue simply drops new items when it's full. It doesn't wait for space to become available.

ts
import { Queue } from "effect"
 
// Creating a dropping queue with a capacity of 100
const droppingQueue = Queue.dropping<number>(100)
ts
import { Queue } from "effect"
 
// Creating a dropping queue with a capacity of 100
const droppingQueue = Queue.dropping<number>(100)

Sliding Queue

A sliding queue removes old items when it's full to accommodate new ones.

ts
import { Queue } from "effect"
 
// Creating a sliding queue with a capacity of 100
const slidingQueue = Queue.sliding<number>(100)
ts
import { Queue } from "effect"
 
// Creating a sliding queue with a capacity of 100
const slidingQueue = Queue.sliding<number>(100)

Unbounded Queue

An unbounded queue has no capacity limit.

ts
import { Queue } from "effect"
 
// Creating an unbounded queue
const unboundedQueue = Queue.unbounded<number>()
ts
import { Queue } from "effect"
 
// Creating an unbounded queue
const unboundedQueue = Queue.unbounded<number>()

Adding Items to a Queue

To add a value to the queue, you can use the Queue.offer operation:

ts
import { Effect, Queue } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 1) // put 1 in the queue
})
ts
import { Effect, Queue } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 1) // put 1 in the queue
})

If you're using a back-pressured queue and it's full, the offer operation might suspend. In such cases, you can use Effect.fork to wait in a different execution context (fiber).

ts
import { Effect, Queue, Fiber } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(1)
yield* Queue.offer(queue, 1)
const fiber = yield* Effect.fork(Queue.offer(queue, 2)) // will be suspended because the queue is full
yield* Queue.take(queue)
yield* Fiber.join(fiber)
})
ts
import { Effect, Queue, Fiber } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(1)
yield* Queue.offer(queue, 1)
const fiber = yield* Effect.fork(Queue.offer(queue, 2)) // will be suspended because the queue is full
yield* Queue.take(queue)
yield* Fiber.join(fiber)
})

You can also add multiple values at once using Queue.offerAll:

ts
import { Effect, Queue, Array } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
const items = Array.range(1, 10)
yield* Queue.offerAll(queue, items)
return yield* Queue.size(queue)
})
 
Effect.runPromise(program).then(console.log) // Output: 10
ts
import { Effect, Queue, Array } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
const items = Array.range(1, 10)
yield* Queue.offerAll(queue, items)
return yield* Queue.size(queue)
})
 
Effect.runPromise(program).then(console.log) // Output: 10

Consuming Items from a Queue

The Queue.take operation removes the oldest item from the queue and returns it. If the queue is empty, it will suspend and resume only when an item is added to the queue. You can also use Effect.fork to wait for the value in a different execution context (fiber).

ts
import { Effect, Queue, Fiber } from "effect"
 
const oldestItem = Effect.gen(function* () {
const queue = yield* Queue.bounded<string>(100)
const fiber = yield* Effect.fork(Queue.take(queue)) // will be suspended because the queue is empty
yield* Queue.offer(queue, "something")
const value = yield* Fiber.join(fiber)
return value
})
 
Effect.runPromise(oldestItem).then(console.log) // Output: something
ts
import { Effect, Queue, Fiber } from "effect"
 
const oldestItem = Effect.gen(function* () {
const queue = yield* Queue.bounded<string>(100)
const fiber = yield* Effect.fork(Queue.take(queue)) // will be suspended because the queue is empty
yield* Queue.offer(queue, "something")
const value = yield* Fiber.join(fiber)
return value
})
 
Effect.runPromise(oldestItem).then(console.log) // Output: something

You can retrieve the first item using Queue.poll. If the queue is empty, you'll get None; otherwise, the top item will be wrapped in Some.

ts
import { Effect, Queue } from "effect"
 
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 10)
yield* Queue.offer(queue, 20)
const head = yield* Queue.poll(queue)
return head
})
 
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 10
}
*/
ts
import { Effect, Queue } from "effect"
 
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 10)
yield* Queue.offer(queue, 20)
const head = yield* Queue.poll(queue)
return head
})
 
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Option",
_tag: "Some",
value: 10
}
*/

You can retrieve multiple items at once using Queue.takeUpTo. If the queue doesn't have enough items to return, it will return all the available items without waiting for more offers.

ts
import { Effect, Queue } from "effect"
 
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 10)
yield* Queue.offer(queue, 20)
yield* Queue.offer(queue, 30)
const chunk = yield* Queue.takeUpTo(queue, 2)
return chunk
})
 
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 10, 20 ]
}
*/
ts
import { Effect, Queue } from "effect"
 
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 10)
yield* Queue.offer(queue, 20)
yield* Queue.offer(queue, 30)
const chunk = yield* Queue.takeUpTo(queue, 2)
return chunk
})
 
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 10, 20 ]
}
*/

Similarly, you can retrieve all items at once using Queue.takeAll. It returns immediately, providing an empty collection if the queue is empty.

ts
import { Effect, Queue } from "effect"
 
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 10)
yield* Queue.offer(queue, 20)
yield* Queue.offer(queue, 30)
const chunk = yield* Queue.takeAll(queue)
return chunk
})
 
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 10, 20, 30 ]
}
*/
ts
import { Effect, Queue } from "effect"
 
const polled = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Queue.offer(queue, 10)
yield* Queue.offer(queue, 20)
yield* Queue.offer(queue, 30)
const chunk = yield* Queue.takeAll(queue)
return chunk
})
 
Effect.runPromise(polled).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 10, 20, 30 ]
}
*/

Shutting Down a Queue

With Queue.shutdown, you can interrupt all fibers that are suspended on offer* or take*. It also empties the queue and causes all future offer* and take* calls to terminate immediately.

ts
import { Effect, Queue, Fiber } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(3)
const fiber = yield* Effect.fork(Queue.take(queue))
yield* Queue.shutdown(queue) // will interrupt fiber
yield* Fiber.join(fiber) // will terminate
})
ts
import { Effect, Queue, Fiber } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(3)
const fiber = yield* Effect.fork(Queue.take(queue))
yield* Queue.shutdown(queue) // will interrupt fiber
yield* Fiber.join(fiber) // will terminate
})

You can use Queue.awaitShutdown to execute an effect when the queue is shut down. This function waits until the queue is shut down, and if it's already shut down, it resumes immediately.

ts
import { Effect, Queue, Fiber, Console } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(3)
const fiber = yield* Effect.fork(
Queue.awaitShutdown(queue).pipe(
Effect.andThen(Console.log("shutting down"))
)
)
yield* Queue.shutdown(queue)
yield* Fiber.join(fiber)
})
 
Effect.runPromise(program) // Output: shutting down
ts
import { Effect, Queue, Fiber, Console } from "effect"
 
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(3)
const fiber = yield* Effect.fork(
Queue.awaitShutdown(queue).pipe(
Effect.andThen(Console.log("shutting down"))
)
)
yield* Queue.shutdown(queue)
yield* Fiber.join(fiber)
})
 
Effect.runPromise(program) // Output: shutting down

Offer-only / Take-only Queues

In some situations, you may need specific parts of your code to have exclusive capabilities, such as only offering values (Enqueue) or only taking values (Dequeue) from a queue. Effect provides a straightforward way to achieve this.

All operations related to offering values are defined by the Enqueue interface. Here's an example of how to use it:

ts
import { Queue } from "effect"
 
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
// This enqueue can only be used to offer values
 
// @ts-expect-error
Queue.take(offerOnlyQueue)
 
// Ok
return Queue.offer(offerOnlyQueue, value)
}
ts
import { Queue } from "effect"
 
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
// This enqueue can only be used to offer values
 
// @ts-expect-error
Queue.take(offerOnlyQueue)
 
// Ok
return Queue.offer(offerOnlyQueue, value)
}

Similarly, all operations related to taking values are defined by the Dequeue interface. Here's an example:

ts
import { Queue } from "effect"
 
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
// This dequeue can only be used to take values
 
// @ts-expect-error
Queue.offer(takeOnlyQueue, 1)
 
// Ok
return Queue.take(takeOnlyQueue)
}
ts
import { Queue } from "effect"
 
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
// This dequeue can only be used to take values
 
// @ts-expect-error
Queue.offer(takeOnlyQueue, 1)
 
// Ok
return Queue.take(takeOnlyQueue)
}

The Queue type extends both Enqueue and Dequeue, allowing you to conveniently pass it to different parts of your code where you want to enforce either Enqueue or Dequeue behavior:

ts
import { Effect, Queue } from "effect"
 
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
return Queue.offer(offerOnlyQueue, value)
}
 
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
return Queue.take(takeOnlyQueue)
}
 
const program = Effect.gen(function* () {
const queue = yield* Queue.unbounded<number>()
 
// Offer values to the queue
yield* send(queue, 1)
yield* send(queue, 2)
 
// Take values from the queue
console.log(yield* receive(queue))
console.log(yield* receive(queue))
})
 
Effect.runPromise(program)
/*
Output:
1
2
*/
ts
import { Effect, Queue } from "effect"
 
const send = (offerOnlyQueue: Queue.Enqueue<number>, value: number) => {
return Queue.offer(offerOnlyQueue, value)
}
 
const receive = (takeOnlyQueue: Queue.Dequeue<number>) => {
return Queue.take(takeOnlyQueue)
}
 
const program = Effect.gen(function* () {
const queue = yield* Queue.unbounded<number>()
 
// Offer values to the queue
yield* send(queue, 1)
yield* send(queue, 2)
 
// Take values from the queue
console.log(yield* receive(queue))
console.log(yield* receive(queue))
})
 
Effect.runPromise(program)
/*
Output:
1
2
*/