ℹ️ Select 'Choose Exercise', or randomize 'Next Random Exercise' in selected language.

Choose Exercise:
Timer 00:00
WPM --
Score --
Acc --
Correct chars --

Fiber Pool Manager with Bounded Concurrency

Crystal

Goal -- WPM

Ready
Exercise Algorithm Area
1class FiberPool
2getter max_concurrency
3getter active_fibers
4
5def initialize(@max_concurrency : Int32)
6@active_fibers = 0
7@mutex = Mutex.new
8@condition = Condition.new
9end
10
11def schedule(task : Proc(Nil))
12@mutex.synchronize do
13while @active_fibers >= @max_concurrency
14@condition.wait(@mutex)
15end
16@active_fibers += 1
17end
18
19spawn do
20begin
21task.call
22ensure
23@mutex.synchronize do
24@active_fibers -= 1
25@condition.signal
26end
27end
28end
29end
30
31def wait_for_completion
32@mutex.synchronize do
33while @active_fibers > 0
34@condition.wait(@mutex)
35end
36end
37end
38end
39
40def process_tasks(pool : FiberPool, tasks : Array(Proc(Nil)))
41tasks.each do |task|
42pool.schedule(task)
43end
44pool.wait_for_completion
45end
46
47# Example Usage:
48# pool = FiberPool.new(5)
49# tasks = [
50# proc { sleep(1); puts "Task 1 done" },
51# proc { sleep(2); puts "Task 2 done" },
52# proc { sleep(0.5); puts "Task 3 done" },
53# proc { sleep(1.5); puts "Task 4 done" },
54# proc { sleep(1); puts "Task 5 done" },
55# proc { sleep(2.5); puts "Task 6 done" },
56# proc { sleep(0.8); puts "Task 7 done" }
57# ]
58# process_tasks(pool, tasks)
Algorithm description viewbox

Fiber Pool Manager with Bounded Concurrency

Algorithm description:

This code implements a Fiber pool manager in Crystal to control the maximum number of concurrently executing fibers. It uses a mutex and a condition variable to ensure thread-safe access to the active fiber count and to signal when a slot becomes available. This is useful for managing resource-intensive operations, like network requests or heavy computations, without overwhelming the system.

Algorithm explanation:

The FiberPool class manages a fixed number of concurrent fibers. The `schedule` method ensures that no more than `max_concurrency` fibers run simultaneously. If the limit is reached, new fiber requests `wait` on a condition variable until a fiber completes. When a fiber finishes, it signals the condition variable, allowing a waiting fiber to proceed. The `wait_for_completion` method blocks until all scheduled fibers have finished. This pattern ensures bounded concurrency, preventing resource exhaustion. Time complexity for scheduling a task is O(1) on average, but can be O(N) in the worst case if all fibers are waiting. Space complexity is O(1) for the pool itself, plus O(N) for the tasks being executed.

Pseudocode:

Class FiberPool:
  Initialize(max_concurrency):
    Set active_fibers = 0
    Initialize mutex and condition variable

  Schedule(task):
    Acquire mutex
    While active_fibers >= max_concurrency:
      Wait on condition variable
    Increment active_fibers
    Release mutex

    Spawn a new fiber:
      Execute task
      Ensure:
        Acquire mutex
        Decrement active_fibers
        Signal condition variable
        Release mutex

  Wait_for_completion():
    Acquire mutex
    While active_fibers > 0:
      Wait on condition variable
    Release mutex

Function process_tasks(pool, tasks):
  For each task in tasks:
    pool.schedule(task)
  pool.wait_for_completion()