Skip to content

Commit 0020bac

Browse files
committed
Improved RPC mechanism.
1 parent 9b75597 commit 0020bac

File tree

14 files changed

+347
-219
lines changed

14 files changed

+347
-219
lines changed

async-container-supervisor.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ Gem::Specification.new do |spec|
2424

2525
spec.required_ruby_version = ">= 3.1"
2626

27-
spec.add_dependency "async-container"
27+
spec.add_dependency "async-container", "~> 0.22"
2828
spec.add_dependency "async-service"
2929
spec.add_dependency "io-stream"
30-
spec.add_dependency "memory-leak", "~> 0.2"
30+
spec.add_dependency "memory-leak", "~> 0.3"
3131
end

example/simple/simple.rb

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@ class SleepService < Async::Service::Generic
66
def setup(container)
77
super
88

9-
container.run(count: 1, restart: true, health_check_timeout: 2) do |instance|
9+
container.run(name: self.class.name, count: 8, restart: true, health_check_timeout: 2) do |instance|
1010
Async do
1111
client = Async::Container::Supervisor::Client.new(instance, @evaluator.supervisor_endpoint)
1212
client.run
1313

14+
start_time = Time.now
15+
1416
instance.ready!
1517

1618
chunks = []
1719
while true
18-
Console.info(self, "Allocating memory...")
19-
# Allocate 10MB of memory every second:
20-
chunks << " " * 1024 * 1024
21-
sleep 0.1
20+
chunks << " " * 1024 * 1024 * rand(10)
21+
sleep 1
2222
instance.ready!
23+
24+
uptime = Time.now - start_time
25+
instance.name = "Sleeping for #{uptime.to_i} seconds..."
2326
end
2427
ensure
2528
Console.info(self, "Exiting...")
@@ -38,6 +41,6 @@ def setup(container)
3841
include Async::Container::Supervisor::Environment
3942

4043
monitors do
41-
[Async::Container::Supervisor::Monitor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 100)]
44+
[Async::Container::Supervisor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 800)]
4245
end
4346
end

gems.locked

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
1-
PATH
2-
remote: ../async-container
3-
specs:
4-
async-container (0.21.0)
5-
async (~> 2.22)
6-
71
PATH
82
remote: .
93
specs:
104
async-container-supervisor (0.0.0)
11-
async-container
5+
async-container (~> 0.22)
126
async-service
137
io-stream
14-
memory-leak (~> 0.2)
8+
memory-leak (~> 0.3)
159

1610
GEM
1711
remote: https://rubygems.org/
@@ -23,6 +17,8 @@ GEM
2317
io-event (~> 1.9)
2418
metrics (~> 0.12)
2519
traces (~> 0.15)
20+
async-container (0.22.0)
21+
async (~> 2.22)
2622
async-http (0.87.0)
2723
async (>= 2.10.2)
2824
async-pool (~> 0.9)
@@ -104,7 +100,7 @@ GEM
104100
net-smtp
105101
mapping (1.1.1)
106102
markly (0.12.1)
107-
memory-leak (0.2.0)
103+
memory-leak (0.4.0)
108104
metrics (0.12.1)
109105
mime-types (3.6.0)
110106
logger
@@ -206,7 +202,6 @@ PLATFORMS
206202
ruby
207203

208204
DEPENDENCIES
209-
async-container!
210205
async-container-supervisor!
211206
bake-gem
212207
bake-modernize

gems.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
gemspec
99

10-
gem "async-container", path: "../async-container"
11-
1210
group :maintenance, optional: true do
1311
gem "bake-gem"
1412
gem "bake-modernize"

lib/async/container/supervisor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
require_relative "supervisor/server"
99
require_relative "supervisor/client"
10-
require_relative "supervisor/monitor"
10+
11+
require_relative "supervisor/memory_monitor"
1112

1213
require_relative "supervisor/environment"
1314
require_relative "supervisor/service"

lib/async/container/supervisor/client.rb

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "io/stream"
7-
require_relative "wrapper"
7+
require_relative "connection"
88

99
module Async
1010
module Container
@@ -20,46 +20,86 @@ def initialize(instance, endpoint = Supervisor.endpoint)
2020
end
2121

2222
def connect
23-
unless @wrapper
23+
unless @connection
2424
peer = @endpoint.connect
2525
stream = IO::Stream(peer)
26-
@wrapper = Wrapper.new(stream)
26+
@connection = Connection.new(stream, 0, instance: @instance)
2727

28-
@wrapper.write(action: "register", instance: @instance)
28+
# Register the instance with the server:
29+
@connection.call(do: :register, state: @instance)
2930
end
3031

31-
return @wrapper unless block_given?
32+
return @connection unless block_given?
3233

3334
begin
34-
yield @wrapper
35+
yield @connection
3536
ensure
36-
@wrapper.close
37+
@connection.close
3738
end
3839
end
3940

4041
def close
41-
if wrapper = @wrapper
42-
@wrapper = nil
43-
wrapper.close
42+
if connection = @connection
43+
@connection = nil
44+
connection.close
4445
end
4546
end
4647

47-
def do_memory_dump(wrapper, message)
48-
Console.info(self, "Memory dump:", message)
49-
path = message[:path]
50-
51-
File.open(path, "w") do |file|
48+
private def dump(call)
49+
if path = message[:path]
50+
File.open(path, "w") do |file|
51+
yield file
52+
end
53+
54+
call.finish(path: path)
55+
else
56+
buffer = StringIO.new
57+
yield buffer
58+
59+
call.finish(data: buffer.string)
60+
end
61+
end
62+
63+
def do_scheduler_dump(call)
64+
dump(call) do |file|
65+
Fiber.scheduler.print_hierarchy(file)
66+
end
67+
end
68+
69+
def do_memory_dump(call)
70+
dump(call) do |file|
5271
ObjectSpace.dump_all(output: file)
5372
end
5473
end
5574

75+
def do_thread_dump(call)
76+
dump(call) do |file|
77+
Thread.list.each do |thread|
78+
file.puts(thread.inspect)
79+
file.puts(thread.backtrace)
80+
end
81+
end
82+
end
83+
84+
def do_garbage_profile_start(connection, message)
85+
Console.info(self, "Garbage profile start:", message)
86+
GC::Profiler.enable
87+
end
88+
89+
def do_garbage_profile_stop(connection, message)
90+
Console.info(self, "Garbage profile stop:", message)
91+
GC::Profiler.disable
92+
93+
dump(connection, message) do |file|
94+
file.puts GC::Profiler.result
95+
end
96+
end
97+
5698
def run
5799
Async do |task|
58100
loop do
59-
Console.info(self, "Connecting to supervisor...")
60-
connect do |wrapper|
61-
Console.info(self, "Connected to supervisor.")
62-
wrapper.run(self)
101+
connect do |connection|
102+
connection.run(self)
63103
end
64104
rescue => error
65105
Console.error(self, "Unexpected error while running client!", exception: error)
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "json"
7+
8+
module Async
9+
module Container
10+
module Supervisor
11+
class Connection
12+
class Call
13+
def initialize(connection, id, message)
14+
@connection = connection
15+
@id = id
16+
@message = message
17+
18+
@queue = ::Thread::Queue.new
19+
end
20+
21+
# @attribute [Connection] The connection that initiated the call.
22+
attr :connection
23+
24+
# @attribute [Hash] The message that initiated the call.
25+
attr :message
26+
27+
def push(**response)
28+
@queue.push(response)
29+
end
30+
31+
def pop(...)
32+
@queue.pop(...)
33+
end
34+
35+
def each(&block)
36+
while response = self.pop
37+
yield response
38+
end
39+
end
40+
41+
def finish(**response)
42+
self.push(id: @id, finished: true, **response)
43+
@queue.close
44+
end
45+
46+
def closed?
47+
@queue.closed?
48+
end
49+
50+
def self.dispatch(connection, target, id, message)
51+
Async do
52+
call = self.new(connection, id, message)
53+
connection.calls[id] = call
54+
55+
target.dispatch(call)
56+
57+
while response = call.pop
58+
connection.write(id: id, **response)
59+
end
60+
ensure
61+
# If the queue is closed, we don't need to send a finished message.
62+
unless call.closed?
63+
connection.write(id: id, finished: true)
64+
end
65+
66+
connection.calls.delete(id)
67+
end
68+
end
69+
70+
def self.call(connection, **message, &block)
71+
id = connection.next_id
72+
call = self.new(connection, id, message)
73+
74+
connection.calls[id] = call
75+
connection.write(id: id, **message)
76+
77+
if block_given?
78+
call.each(&block)
79+
else
80+
return call.pop
81+
end
82+
end
83+
end
84+
85+
def initialize(stream, id, **state)
86+
@stream = stream
87+
@state = state
88+
89+
@calls = {}
90+
91+
@id = 0
92+
end
93+
94+
# @attribute [Hash(Integer, Call)] Calls in progress.
95+
attr :calls
96+
97+
# @attribute [Hash(Symbol, Object)] State associated with this connection, for example the process ID, etc.
98+
attr_accessor :state
99+
100+
def next_id
101+
@id += 2
102+
end
103+
104+
def write(**message)
105+
@stream.write(JSON.dump(message) << "\n")
106+
@stream.flush
107+
end
108+
109+
def call(**message)
110+
id = next_id
111+
calls[id] = ::Thread::Queue.new
112+
113+
write(id: id, **message)
114+
115+
return calls[id].pop
116+
ensure
117+
calls.delete(id)
118+
end
119+
120+
def read
121+
if line = @stream&.gets
122+
JSON.parse(line, symbolize_names: true)
123+
end
124+
end
125+
126+
def each
127+
while message = self.read
128+
yield message
129+
end
130+
end
131+
132+
def call(...)
133+
Call.call(self, ...)
134+
end
135+
136+
def run(target)
137+
self.each do |message|
138+
if id = message[:id]
139+
if call = @calls[id]
140+
# Response to a call:
141+
call.push(message)
142+
else
143+
# Incoming call:
144+
Call.dispatch(self, target, id, message)
145+
end
146+
else
147+
Console.error(self, "Unknown message:", message)
148+
end
149+
end
150+
end
151+
152+
def close
153+
if stream = @stream
154+
@stream = nil
155+
stream.close
156+
end
157+
end
158+
end
159+
end
160+
end
161+
end

0 commit comments

Comments
 (0)