A Queue is a lightweight in-memory queue with built-in back-pressure, enabling asynchronous, purely-functional, and type-safe handling of data.
Basic Operations
A Queue<A> stores values of type A and provides two fundamental operations:
API
Description
Queue.offer
Adds a value of type A to the queue.
Queue.take
Removes and returns the oldest value from the queue.
Example (Adding and Retrieving an Item)
Creating a Queue
Queues can be bounded (with a specified capacity) or unbounded (without a limit). Different types of queues handle new values differently when they reach capacity.
Bounded Queue
A bounded queue applies back-pressure when full, meaning any Queue.offer operation will suspend until there is space.
Example (Creating a Bounded Queue)
Dropping Queue
A dropping queue discards new values if the queue is full.
Example (Creating a Dropping Queue)
Sliding Queue
A sliding queue removes old values to make space for new ones when it reaches capacity.
Example (Creating a Sliding Queue)
Unbounded Queue
An unbounded queue has no capacity limit, allowing unrestricted additions.
Example (Creating an Unbounded Queue)
Adding Items to a Queue
offer
Use Queue.offer to add values to the queue.
Example (Adding a Single Item)
When using a back-pressured queue, Queue.offer suspends if the queue is full. To avoid blocking the main fiber, you can fork the Queue.offer operation.
Example (Handling a Full Queue with Effect.fork)
offerAll
You can also add multiple items at once using Queue.offerAll.
Example (Adding Multiple Items)
Consuming Items from a Queue
take
The Queue.take operation removes and returns the oldest item from the queue. If the queue is empty, Queue.take will suspend and only resume when an item is added. To prevent blocking, you can fork the Queue.take operation into a new fiber.
Example (Waiting for an Item in a Fiber)
poll
To retrieve the queue’s first item without suspending, use Queue.poll. If the queue is empty, Queue.poll returns None; if it has an item, it wraps it in Some.
Example (Polling an Item)
takeUpTo
To retrieve multiple items, use Queue.takeUpTo, which returns up to the specified number of items. If there aren’t enough items, it returns all available items without waiting for more.
Example (Taking Multiple Items)
takeAll
To retrieve all items from the queue at once, use Queue.takeAll. This operation completes immediately, returning an empty collection if the queue is empty.
Example (Taking All Items)
Shutting Down a Queue
shutdown
The Queue.shutdown operation allows you to interrupt all fibers that are currently suspended on offer* or take* operations. This action also empties the queue and makes any future offer* and take* calls terminate immediately.
Example (Interrupting Fibers on Queue Shutdown)
awaitShutdown
The Queue.awaitShutdown operation can be used to run an effect when the queue shuts down. It waits until the queue is closed and resumes immediately if the queue is already shut down.
Example (Waiting for Queue Shutdown)
Offer-only / Take-only Queues
Sometimes, you might want certain parts of your code to only add values to a queue (Enqueue) or only retrieve values from a queue (Dequeue). Effect provides interfaces to enforce these specific capabilities.
Enqueue
All methods for adding values to a queue are defined by the Enqueue interface. This restricts the queue to only offer operations.
Example (Restricting Queue to Offer-only Operations)
Dequeue
Similarly, all methods for retrieving values from a queue are defined by the Dequeue interface, which restricts the queue to only take operations.
Example (Restricting Queue to Take-only Operations)
The Queue type combines both Enqueue and Dequeue, so you can easily pass it to different parts of your code, enforcing only Enqueue or Dequeue behaviors as needed.
Example (Using Offer-only and Take-only Queues Together)