Skip to content

Commit 395e1af

Browse files
committed
Improved RPC mechanism.
1 parent 86475e4 commit 395e1af

File tree

13 files changed

+349
-209
lines changed

13 files changed

+349
-209
lines changed

async-container-supervisor.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ Gem::Specification.new do |spec|
2424

2525
spec.required_ruby_version = ">= 3.1"
2626

27-
spec.add_dependency "async-container"
27+
spec.add_dependency "async-container", "~> 0.22"
2828
spec.add_dependency "async-service"
2929
spec.add_dependency "io-stream"
30-
spec.add_dependency "memory-leak", "~> 0.2"
30+
spec.add_dependency "memory-leak", "~> 0.3"
3131
end

example/simple/simple.rb

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@ class SleepService < Async::Service::Generic
66
def setup(container)
77
super
88

9-
container.run(count: 1, restart: true, health_check_timeout: 2) do |instance|
9+
container.run(name: self.class.name, count: 4, restart: true, health_check_timeout: 2) do |instance|
1010
Async do
1111
client = Async::Container::Supervisor::Client.new(instance, @evaluator.supervisor_endpoint)
1212
client.run
1313

14+
start_time = Time.now
15+
1416
instance.ready!
1517

1618
chunks = []
1719
while true
18-
Console.info(self, "Allocating memory...")
19-
# Allocate 10MB of memory every second:
20-
chunks << " " * 1024 * 1024
21-
sleep 0.1
20+
chunks << " " * 1024 * 1024 * rand(10)
21+
sleep 1
2222
instance.ready!
23+
24+
uptime = Time.now - start_time
25+
instance.name = "Sleeping for #{uptime.to_i} seconds..."
2326
end
2427
ensure
2528
Console.info(self, "Exiting...")
@@ -38,6 +41,6 @@ def setup(container)
3841
include Async::Container::Supervisor::Environment
3942

4043
monitors do
41-
[Async::Container::Supervisor::Monitor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 100)]
44+
[Async::Container::Supervisor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 100)]
4245
end
4346
end

gems.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
gemspec
99

10-
gem "async-container", path: "../async-container"
11-
1210
group :maintenance, optional: true do
1311
gem "bake-gem"
1412
gem "bake-modernize"

lib/async/container/supervisor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
require_relative "supervisor/server"
99
require_relative "supervisor/client"
10-
require_relative "supervisor/monitor"
10+
11+
require_relative "supervisor/memory_monitor"
1112

1213
require_relative "supervisor/environment"
1314
require_relative "supervisor/service"

lib/async/container/supervisor/client.rb

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "io/stream"
7-
require_relative "wrapper"
7+
require_relative "connection"
88

99
module Async
1010
module Container
@@ -19,47 +19,95 @@ def initialize(instance, endpoint = Supervisor.endpoint)
1919
@endpoint = endpoint
2020
end
2121

22+
def dispatch(call)
23+
method_name = "do_#{call.message[:do]}"
24+
self.public_send(method_name, call)
25+
end
26+
2227
def connect
23-
unless @wrapper
28+
unless @connection
2429
peer = @endpoint.connect
2530
stream = IO::Stream(peer)
26-
@wrapper = Wrapper.new(stream)
31+
@connection = Connection.new(stream, 0, instance: @instance)
2732

28-
@wrapper.write(action: "register", instance: @instance)
33+
# Register the instance with the server:
34+
Async do
35+
@connection.call(do: :register, state: @instance)
36+
end
2937
end
3038

31-
return @wrapper unless block_given?
39+
return @connection unless block_given?
3240

3341
begin
34-
yield @wrapper
42+
yield @connection
3543
ensure
36-
@wrapper.close
44+
@connection.close
3745
end
3846
end
3947

4048
def close
41-
if wrapper = @wrapper
42-
@wrapper = nil
43-
wrapper.close
49+
if connection = @connection
50+
@connection = nil
51+
connection.close
52+
end
53+
end
54+
55+
private def dump(call)
56+
if path = call[:path]
57+
File.open(path, "w") do |file|
58+
yield file
59+
end
60+
61+
call.finish(path: path)
62+
else
63+
buffer = StringIO.new
64+
yield buffer
65+
66+
call.finish(data: buffer.string)
67+
end
68+
end
69+
70+
def do_scheduler_dump(call)
71+
dump(call) do |file|
72+
Fiber.scheduler.print_hierarchy(file)
4473
end
4574
end
4675

47-
def do_memory_dump(wrapper, message)
48-
Console.info(self, "Memory dump:", message)
49-
path = message[:path]
76+
def do_memory_dump(call)
77+
require "objspace"
5078

51-
File.open(path, "w") do |file|
79+
dump(call) do |file|
5280
ObjectSpace.dump_all(output: file)
5381
end
5482
end
5583

84+
def do_thread_dump(call)
85+
dump(call) do |file|
86+
Thread.list.each do |thread|
87+
file.puts(thread.inspect)
88+
file.puts(thread.backtrace)
89+
end
90+
end
91+
end
92+
93+
def do_garbage_profile_start(call)
94+
GC::Profiler.enable
95+
call.finish(started: true)
96+
end
97+
98+
def do_garbage_profile_stop(call)
99+
GC::Profiler.disable
100+
101+
dump(connection, message) do |file|
102+
file.puts GC::Profiler.result
103+
end
104+
end
105+
56106
def run
57107
Async do |task|
58108
loop do
59-
Console.info(self, "Connecting to supervisor...")
60-
connect do |wrapper|
61-
Console.info(self, "Connected to supervisor.")
62-
wrapper.run(self)
109+
connect do |connection|
110+
connection.run(self)
63111
end
64112
rescue => error
65113
Console.error(self, "Unexpected error while running client!", exception: error)
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "json"
7+
8+
module Async
9+
module Container
10+
module Supervisor
11+
class Connection
12+
class Call
13+
def initialize(connection, id, message)
14+
@connection = connection
15+
@id = id
16+
@message = message
17+
18+
@queue = ::Thread::Queue.new
19+
end
20+
21+
# @attribute [Connection] The connection that initiated the call.
22+
attr :connection
23+
24+
# @attribute [Hash] The message that initiated the call.
25+
attr :message
26+
27+
def [] key
28+
@message[key]
29+
end
30+
31+
def push(**response)
32+
@queue.push(response)
33+
end
34+
35+
def pop(...)
36+
@queue.pop(...)
37+
end
38+
39+
def each(&block)
40+
while response = self.pop
41+
yield response
42+
end
43+
end
44+
45+
def finish(**response)
46+
self.push(id: @id, finished: true, **response)
47+
@queue.close
48+
end
49+
50+
def closed?
51+
@queue.closed?
52+
end
53+
54+
def self.dispatch(connection, target, id, message)
55+
Async do
56+
call = self.new(connection, id, message)
57+
connection.calls[id] = call
58+
59+
target.dispatch(call)
60+
61+
while response = call.pop
62+
connection.write(id: id, **response)
63+
end
64+
ensure
65+
# If the queue is closed, we don't need to send a finished message.
66+
unless call.closed?
67+
connection.write(id: id, finished: true)
68+
end
69+
70+
connection.calls.delete(id)
71+
end
72+
end
73+
74+
def self.call(connection, **message, &block)
75+
id = connection.next_id
76+
call = self.new(connection, id, message)
77+
78+
connection.calls[id] = call
79+
connection.write(id: id, **message)
80+
81+
if block_given?
82+
call.each(&block)
83+
else
84+
return call.pop
85+
end
86+
end
87+
end
88+
89+
def initialize(stream, id, **state)
90+
@stream = stream
91+
@state = state
92+
93+
@calls = {}
94+
95+
@id = id
96+
end
97+
98+
# @attribute [Hash(Integer, Call)] Calls in progress.
99+
attr :calls
100+
101+
# @attribute [Hash(Symbol, Object)] State associated with this connection, for example the process ID, etc.
102+
attr_accessor :state
103+
104+
def next_id
105+
@id += 2
106+
end
107+
108+
def write(**message)
109+
@stream.write(JSON.dump(message) << "\n")
110+
@stream.flush
111+
end
112+
113+
def call(timeout: nil, **message)
114+
id = next_id
115+
calls[id] = ::Thread::Queue.new
116+
117+
write(id: id, **message)
118+
119+
return calls[id].pop(timeout: timeout)
120+
ensure
121+
calls.delete(id)
122+
end
123+
124+
def read
125+
if line = @stream&.gets
126+
JSON.parse(line, symbolize_names: true)
127+
end
128+
end
129+
130+
def each
131+
while message = self.read
132+
yield message
133+
end
134+
end
135+
136+
def call(...)
137+
Call.call(self, ...)
138+
end
139+
140+
def run(target)
141+
self.each do |message|
142+
if id = message[:id]
143+
if call = @calls[id]
144+
# Response to a call:
145+
call.push(**message)
146+
else
147+
# Incoming call:
148+
Call.dispatch(self, target, id, message)
149+
end
150+
else
151+
Console.error(self, "Unknown message:", message)
152+
end
153+
end
154+
end
155+
156+
def close
157+
if stream = @stream
158+
@stream = nil
159+
stream.close
160+
end
161+
end
162+
end
163+
end
164+
end
165+
end

0 commit comments

Comments
 (0)