Skip to content

Commit 869f7a6

Browse files
committed
use thread helper
1 parent a2f93dc commit 869f7a6

File tree

1 file changed

+42
-41
lines changed

1 file changed

+42
-41
lines changed

lib/fluent/plugin/in_otlp.rb

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require "fluent/plugin/otlp/response"
77
require "fluent/plugin/otlp/service_handler"
88
require "fluent/plugin_helper/http_server"
9+
require "fluent/plugin_helper/thread"
910

1011
require "zlib"
1112

@@ -30,7 +31,7 @@ module Fluent::Plugin
3031
class OtlpInput < Input
3132
Fluent::Plugin.register_input("otlp", self)
3233

33-
helpers :http_server
34+
helpers :thread, :http_server
3435

3536
desc "The tag of the event."
3637
config_param :tag, :string
@@ -67,7 +68,7 @@ def start
6768

6869
if @http_config
6970
http_handler = HttpHandler.new
70-
http_server_create_http_server(:in_otlp_http_server_helper, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
71+
http_server_create_http_server(:in_otlp_http_server, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
7172
serv.post("/v1/logs") do |req|
7273
http_handler.logs(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_LOGS, message: record }) }
7374
end
@@ -81,18 +82,20 @@ def start
8182
end
8283

8384
if @grpc_config
84-
grpc_handler = GrpcHandler.new(@grpc_config)
85-
grpc_handler.run(
86-
logs: lambda { |record|
87-
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_LOGS, message: record })
88-
},
89-
metrics: lambda { |record|
90-
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_METRICS, message: record })
91-
},
92-
traces: lambda { |record|
93-
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_TRACES, message: record })
94-
}
95-
)
85+
thread_create(:in_otlp_grpc_server) do
86+
grpc_handler = GrpcHandler.new(@grpc_config)
87+
grpc_handler.run(
88+
logs: lambda { |record|
89+
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_LOGS, message: record })
90+
},
91+
metrics: lambda { |record|
92+
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_METRICS, message: record })
93+
},
94+
traces: lambda { |record|
95+
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_TRACES, message: record })
96+
}
97+
)
98+
end
9699
end
97100
end
98101

@@ -177,33 +180,31 @@ def initialize(grpc_config)
177180
end
178181

179182
def run(logs:, metrics:, traces:)
180-
Thread.new do
181-
server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new])
182-
server.add_http2_port("#{@grpc_config.host}:#{@grpc_config.port}", :this_port_is_insecure)
183-
184-
logs_handler = Otlp::ServiceHandler::Logs.new
185-
logs_handler.callback = lambda { |request|
186-
logs.call(request.to_json)
187-
Otlp::Response::Logs.build
188-
}
189-
server.handle(logs_handler)
190-
191-
metrics_handler = Otlp::ServiceHandler::Metrics.new
192-
metrics_handler.callback = lambda { |request|
193-
metrics.call(request.to_json)
194-
Otlp::Response::Metrics.build
195-
}
196-
server.handle(metrics_handler)
197-
198-
traces_handler = Otlp::ServiceHandler::Traces.new
199-
traces_handler.callback = lambda { |request|
200-
traces.call(request.to_json)
201-
Otlp::Response::Traces.build
202-
}
203-
server.handle(traces_handler)
204-
205-
server.run_till_terminated
206-
end
183+
server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new])
184+
server.add_http2_port("#{@grpc_config.host}:#{@grpc_config.port}", :this_port_is_insecure)
185+
186+
logs_handler = Otlp::ServiceHandler::Logs.new
187+
logs_handler.callback = lambda { |request|
188+
logs.call(request.to_json)
189+
Otlp::Response::Logs.build
190+
}
191+
server.handle(logs_handler)
192+
193+
metrics_handler = Otlp::ServiceHandler::Metrics.new
194+
metrics_handler.callback = lambda { |request|
195+
metrics.call(request.to_json)
196+
Otlp::Response::Metrics.build
197+
}
198+
server.handle(metrics_handler)
199+
200+
traces_handler = Otlp::ServiceHandler::Traces.new
201+
traces_handler.callback = lambda { |request|
202+
traces.call(request.to_json)
203+
Otlp::Response::Traces.build
204+
}
205+
server.handle(traces_handler)
206+
207+
server.run_till_terminated
207208
end
208209
end
209210
end

0 commit comments

Comments
 (0)