1package com.example.actors
2
3import akka.actor.{Actor, ActorRef, ActorSystem, Props}
4import akka.pattern.ask
5import akka.util.Timeout
6
7import scala.concurrent.duration._
8import scala.concurrent.{Await, Future}
9import scala.util.{Failure, Success}
10
11
12case class Produce(item: String)
13case class GetBufferStatus
14
15
16case class Consume
17
18
19case class AddItem(item: String)
20case object RemoveItem
21case class BufferStatus(size: Int, capacity: Int)
22
23
24
25
26
27class ProducerActor(buffer: ActorRef, numItems: Int) extends Actor {
28override def receive: Receive = {
29case "start" =>
30println(s"Producer starting to produce $numItems items.")
31for (i <- 1 to numItems) {
32val item = s"item_$i"
33println(s"Producer sending: $item")
34buffer ! Produce(item)
35}
36println("Producer finished producing.")
37context.stop(self)
38
39case Produce(item) =>
40buffer ! AddItem(item)
41}
42}
43
44
45
46
47
48class ConsumerActor(buffer: ActorRef) extends Actor {
49override def receive: Receive = {
50case "start" =>
51println("Consumer starting to consume.")
52
53self ! Consume
54
55case Consume =>
56implicit val timeout: Timeout = 5.seconds
57
58val futureItem: Future[Any] = buffer ? RemoveItem
59
60futureItem.onComplete {
61case Success(item: String) =>
62println(s"Consumer received: $item")
63
64context.system.scheduler.scheduleOnce(1.second, self, Consume)
65case Success("BufferEmpty") =>
66println("Buffer is empty. Waiting...")
67
68context.system.scheduler.scheduleOnce(2.seconds, self, Consume)
69case Failure(e) =>
70println(s"Consumer failed to get item: ${e.getMessage}")
71context.stop(self)
72case _ =>
73println("Consumer received unexpected response from buffer.")
74context.stop(self)
75}
76
77case GetBufferStatus =>
78buffer forward GetBufferStatus
79}
80}
81
82
83
84
85
86class BufferActor(capacity: Int) extends Actor {
87private var queue: List[String] = List.empty
88
89override def receive: Receive = {
90case AddItem(item) =>
91if (queue.size < capacity) {
92queue = queue :+ item
93println(s"Buffer added: $item. Current size: ${queue.size}")
94} else {
95println(s"Buffer full. Dropping item: $item")
96
97}
98
99case RemoveItem =>
100if (queue.nonEmpty) {
101val item = queue.head
102queue = queue.tail
103println(s"Buffer removed: $item. Current size: ${queue.size}")
104sender() ! item
105} else {
106println("Buffer is empty.")
107sender() ! "BufferEmpty"
108}
109
110case GetBufferStatus =>
111sender() ! BufferStatus(queue.size, capacity)
112}
113}
114
115object ProducerConsumerApp {
116
117def main(args: Array[String]): Unit = {
118val system = ActorSystem("ProducerConsumerSystem")
119val bufferCapacity = 5
120val numberOfItemsToProduce = 20
121
122
123val bufferActor = system.actorOf(Props(new BufferActor(bufferCapacity)), "buffer")
124
125
126val producerActor = system.actorOf(Props(new ProducerActor(bufferActor, numberOfItemsToProduce)), "producer")
127
128
129val consumerActor = system.actorOf(Props(new ConsumerActor(bufferActor)), "consumer")
130
131
132producerActor ! "start"
133consumerActor ! "start"
134
135
136implicit val timeout: Timeout = 5.seconds
137val statusFuture = system.scheduler.scheduleAtFixedRate(0.seconds, 3.seconds) {
138() =>
139val status = Await.result(bufferActor ? GetBufferStatus, timeout.duration).asInstanceOf[BufferStatus]
140println(s"\n--- Buffer Status: Size = ${status.size}, Capacity = ${status.capacity} ---\n")
141}
142
143
144Thread.sleep(20000)
145
146
147system.terminate()
148println("System terminated.")
149}
150}