ℹ️ Select 'Choose Exercise', or randomize 'Next Random Exercise' in selected language.

Choose Exercise:
Timer 00:00
WPM --
Score --
Acc --
Correct chars --

Scala Actor Concurrency: Producer-Consumer with Bounded Buffer

Scala

Goal -- WPM

Ready
Exercise Algorithm Area
1package com.example.actors
2
3import akka.actor.{Actor, ActorRef, ActorSystem, Props}
4import akka.pattern.ask
5import akka.util.Timeout
6
7import scala.concurrent.duration._
8import scala.concurrent.{Await, Future}
9import scala.util.{Failure, Success}
10
11// Messages for the Producer
12case class Produce(item: String)
13case class GetBufferStatus
14
15// Messages for the Consumer
16case class Consume
17
18// Messages for the Buffer
19case class AddItem(item: String)
20case object RemoveItem
21case class BufferStatus(size: Int, capacity: Int)
22
23/**
24* The Producer Actor.
25* It sends items to the Buffer Actor.
26*/
27class ProducerActor(buffer: ActorRef, numItems: Int) extends Actor {
28override def receive: Receive = {
29case "start" =>
30println(s"Producer starting to produce $numItems items.")
31for (i <- 1 to numItems) {
32val item = s"item_$i"
33println(s"Producer sending: $item")
34buffer ! Produce(item) // Send to buffer indirectly via message
35}
36println("Producer finished producing.")
37context.stop(self) // Stop producer after finishing
38
39case Produce(item) => // Producer receives Produce message to send to buffer
40buffer ! AddItem(item)
41}
42}
43
44/**
45* The Consumer Actor.
46* It requests items from the Buffer Actor.
47*/
48class ConsumerActor(buffer: ActorRef) extends Actor {
49override def receive: Receive = {
50case "start" =>
51println("Consumer starting to consume.")
52// Consumer will continuously try to consume until buffer is empty or producer stops
53self ! Consume // Initial request to consume
54
55case Consume =>
56implicit val timeout: Timeout = 5.seconds
57// Ask the buffer for an item. This is a non-blocking ask.
58val futureItem: Future[Any] = buffer ? RemoveItem
59
60futureItem.onComplete {
61case Success(item: String) =>
62println(s"Consumer received: $item")
63// Schedule another consume request after a short delay
64context.system.scheduler.scheduleOnce(1.second, self, Consume)
65case Success("BufferEmpty") =>
66println("Buffer is empty. Waiting...")
67// If buffer is empty, wait a bit before trying again
68context.system.scheduler.scheduleOnce(2.seconds, self, Consume)
69case Failure(e) =>
70println(s"Consumer failed to get item: ${e.getMessage}")
71context.stop(self)
72case _ => // Handle unexpected responses
73println("Consumer received unexpected response from buffer.")
74context.stop(self)
75}
76
77case GetBufferStatus =>
78buffer forward GetBufferStatus // Forward the request to the buffer
79}
80}
81
82/**
83* The Buffer Actor.
84* Acts as a bounded buffer between producers and consumers.
85*/
86class BufferActor(capacity: Int) extends Actor {
87private var queue: List[String] = List.empty
88
89override def receive: Receive = {
90case AddItem(item) =>
91if (queue.size < capacity) {
92queue = queue :+ item // Append to the end
93println(s"Buffer added: $item. Current size: ${queue.size}")
94} else {
95println(s"Buffer full. Dropping item: $item")
96// In a real scenario, might send a 'BufferFull' ack or retry
97}
98
99case RemoveItem =>
100if (queue.nonEmpty) {
101val item = queue.head
102queue = queue.tail // Remove from the front
103println(s"Buffer removed: $item. Current size: ${queue.size}")
104sender() ! item // Send the item back to the sender (Consumer)
105} else {
106println("Buffer is empty.")
107sender() ! "BufferEmpty" // Indicate buffer is empty
108}
109
110case GetBufferStatus =>
111sender() ! BufferStatus(queue.size, capacity)
112}
113}
114
115object ProducerConsumerApp {
116
117def main(args: Array[String]): Unit = {
118val system = ActorSystem("ProducerConsumerSystem")
119val bufferCapacity = 5
120val numberOfItemsToProduce = 20
121
122// Create the Buffer Actor
123val bufferActor = system.actorOf(Props(new BufferActor(bufferCapacity)), "buffer")
124
125// Create the Producer Actor
126val producerActor = system.actorOf(Props(new ProducerActor(bufferActor, numberOfItemsToProduce)), "producer")
127
128// Create the Consumer Actor
129val consumerActor = system.actorOf(Props(new ConsumerActor(bufferActor)), "consumer")
130
131// Start the actors
132producerActor ! "start"
133consumerActor ! "start"
134
135// To observe buffer status periodically (optional)
136implicit val timeout: Timeout = 5.seconds
137val statusFuture = system.scheduler.scheduleAtFixedRate(0.seconds, 3.seconds) {
138() =>
139val status = Await.result(bufferActor ? GetBufferStatus, timeout.duration).asInstanceOf[BufferStatus]
140println(s"\n--- Buffer Status: Size = ${status.size}, Capacity = ${status.capacity} ---\n")
141}
142
143// Let the system run for a while to observe interactions
144Thread.sleep(20000) // Run for 20 seconds
145
146// Clean up
147system.terminate()
148println("System terminated.")
149}
150}
Algorithm description viewbox

Scala Actor Concurrency: Producer-Consumer with Bounded Buffer

Algorithm description:

This Scala code demonstrates a classic producer-consumer pattern using Akka Actors. A `ProducerActor` generates items and sends them to a `BufferActor`, which acts as a bounded queue. A `ConsumerActor` then retrieves items from the `BufferActor` for processing. This pattern is fundamental for managing concurrent tasks and decoupling data producers from data consumers in distributed or multi-threaded applications.

Algorithm explanation:

The system consists of three actors: Producer, Consumer, and Buffer. The Producer sends `AddItem` messages to the Buffer. The Buffer maintains a queue with a fixed `capacity`. If the buffer is full, incoming items are dropped (or could be handled differently). The Consumer sends `RemoveItem` messages to the Buffer. If the buffer has items, it dequeues one and sends it back to the Consumer. If empty, it signals "BufferEmpty". The `ask` pattern is used for the Consumer to request items, allowing it to handle responses asynchronously. This actor-based approach ensures thread-safety and managed concurrency without explicit locks. Time complexity for adding/removing items from the buffer (implemented as a List) is O(1) on average for `head`/`tail` and O(N) for `:+` (append), but using a `Queue` would make both O(1). Space complexity is O(C) where C is the buffer capacity.

Pseudocode:

ACTOR ProducerActor(buffer, numItems):
  ON MESSAGE "start":
    FOR i FROM 1 TO numItems:
      item = "item_" + i
      SEND AddItem(item) TO buffer
    END FOR
    STOP self

ACTOR ConsumerActor(buffer):
  ON MESSAGE "start":
    SEND Consume TO self

  ON MESSAGE Consume:
    ASK buffer for RemoveItem
    ON SUCCESS item (String):
      PRINT "Consumer received: " + item
      SCHEDULE Consume TO self AFTER 1 second
    ON SUCCESS "BufferEmpty":
      PRINT "Buffer is empty. Waiting..."
      SCHEDULE Consume TO self AFTER 2 seconds
    ON FAILURE error:
      PRINT "Consumer failed: " + error.getMessage
      STOP self

ACTOR BufferActor(capacity):
  PRIVATE queue = empty List

  ON MESSAGE AddItem(item):
    IF queue size < capacity THEN
      APPEND item to queue
      PRINT "Buffer added: " + item
    ELSE
      PRINT "Buffer full. Dropping item: " + item
    END IF

  ON MESSAGE RemoveItem:
    IF queue is NOT empty THEN
      item = HEAD of queue
      REMOVE HEAD from queue
      PRINT "Buffer removed: " + item
      REPLY item TO sender
    ELSE
      PRINT "Buffer is empty."
      REPLY "BufferEmpty" TO sender
    END IF