Skip to content

Commit 488e61e

Browse files
committed
feat(sse): add new sse_client utils
1 parent c2c647f commit 488e61e

File tree

1 file changed

+66
-0
lines changed
  • packages/forest_admin_datasource_rpc/lib/forest_admin_datasource_rpc/Utils

1 file changed

+66
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
require 'openssl'
2+
require 'json'
3+
require 'time'
4+
require 'ld-eventsource'
5+
6+
module ForestAdminDatasourceRpc
7+
module Utils
8+
class SseClient
9+
def initialize(uri, auth_secret)
10+
@uri = uri
11+
@auth_secret = auth_secret
12+
start
13+
end
14+
15+
def start
16+
timestamp = Time.now.utc.iso8601
17+
signature = generate_signature(timestamp)
18+
19+
headers = {
20+
'Accept' => 'text/event-stream',
21+
'X_TIMESTAMP' => timestamp,
22+
'X_SIGNATURE' => signature
23+
}
24+
25+
ForestAdminRpcAgent::Facades::Container.logger.log('Debug', "Connecting to SSE at #{@uri}.")
26+
27+
@client = SSE::Client.new(@uri, headers: headers) do |client|
28+
Thread.new do
29+
client.on_event do |event|
30+
handle_event(event)
31+
end
32+
rescue StandardError => e
33+
puts "[SSE] Connection closed or failed: #{e.class} - #{e.message}"
34+
close
35+
end
36+
end
37+
end
38+
39+
def close
40+
return if @closed
41+
42+
@closed = true
43+
@client&.close
44+
puts '[SSE] Client closed'
45+
end
46+
47+
private
48+
49+
def handle_event(event)
50+
case event.type.to_s.strip
51+
when 'heartbeat'
52+
puts '[SSE] Heartbeat'
53+
when 'RpcServerStop'
54+
puts '[SSE] Server requested stop'
55+
close
56+
else
57+
puts "[SSE] Unknown event: #{event.type} with payload: #{event.data}"
58+
end
59+
end
60+
61+
def generate_signature(timestamp)
62+
OpenSSL::HMAC.hexdigest('SHA256', @auth_secret, timestamp)
63+
end
64+
end
65+
end
66+
end

0 commit comments

Comments
 (0)