Skip to content

Commit 30ebf9a

Browse files
committed
add grpc support
1 parent 59357a6 commit 30ebf9a

File tree

10 files changed

+176
-28
lines changed

10 files changed

+176
-28
lines changed

.rubocop_todo.yml

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,34 @@
11
# This configuration was generated by
22
# `rubocop --auto-gen-config`
3-
# on 2025-04-10 06:21:04 UTC using RuboCop version 1.75.2.
3+
# on 2025-04-15 08:39:58 UTC using RuboCop version 1.75.2.
44
# The point is for the user to remove these configuration records
55
# one by one as the offenses are removed from the code base.
66
# Note that changes in the inspected code, or installation of new
77
# versions of RuboCop, may require this file to be generated again.
88

9+
# Offense count: 2
10+
# This cop supports safe autocorrection (--autocorrect).
11+
# Configuration parameters: Max, AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, AllowedPatterns, SplitStrings.
12+
# URISchemes: http, https
13+
Layout/LineLength:
14+
Exclude:
15+
- 'lib/fluent/plugin/in_otlp.rb'
16+
917
# Offense count: 1
1018
# This cop supports safe autocorrection (--autocorrect).
11-
Lint/RedundantStringCoercion:
19+
# Configuration parameters: AutoCorrect, ContextCreatingMethods, MethodCreatingMethods.
20+
Lint/UselessAccessModifier:
1221
Exclude:
1322
- 'lib/fluent/plugin/out_otlp.rb'
1423

15-
# Offense count: 11
24+
# Offense count: 3
25+
# Configuration parameters: MinNameLength, AllowNamesEndingInNumbers, AllowedNames, ForbiddenNames.
26+
# AllowedNames: as, at, by, cc, db, id, if, in, io, ip, of, on, os, pp, to
27+
Naming/MethodParameterName:
28+
Exclude:
29+
- 'lib/fluent/plugin/otlp/service_stub.rb'
30+
31+
# Offense count: 21
1632
# Configuration parameters: AllowedConstants.
1733
Style/Documentation:
1834
Exclude:
@@ -21,9 +37,11 @@ Style/Documentation:
2137
- 'lib/fluent/plugin/in_otlp.rb'
2238
- 'lib/fluent/plugin/otlp/request.rb'
2339
- 'lib/fluent/plugin/otlp/response.rb'
40+
- 'lib/fluent/plugin/otlp/service_handler.rb'
41+
- 'lib/fluent/plugin/otlp/service_stub.rb'
2442
- 'lib/fluent/plugin/out_otlp.rb'
2543

26-
# Offense count: 3
44+
# Offense count: 4
2745
# This cop supports safe autocorrection (--autocorrect).
2846
# Configuration parameters: MinBodyLength, AllowConsecutiveConditionals.
2947
Style/GuardClause:
@@ -40,3 +58,25 @@ Style/IfUnlessModifier:
4058
Style/MixinUsage:
4159
Exclude:
4260
- 'test/helper.rb'
61+
62+
# Offense count: 1
63+
# This cop supports safe autocorrection (--autocorrect).
64+
# Configuration parameters: Strict, AllowedNumbers, AllowedPatterns.
65+
Style/NumericLiterals:
66+
MinDigits: 6
67+
68+
# Offense count: 1
69+
# This cop supports unsafe autocorrection (--autocorrect-all).
70+
# Configuration parameters: ConvertCodeThatCanStartToReturnNil, AllowedMethods, MaxChainLength.
71+
# AllowedMethods: present?, blank?, presence, try, try!
72+
Style/SafeNavigation:
73+
Exclude:
74+
- 'lib/fluent/plugin/out_otlp.rb'
75+
76+
# Offense count: 3
77+
# This cop supports safe autocorrection (--autocorrect).
78+
# Configuration parameters: ExactNameMatch, AllowPredicates, AllowDSLWriters, IgnoreClassMethods, AllowedMethods.
79+
# AllowedMethods: to_ary, to_a, to_c, to_enum, to_h, to_hash, to_i, to_int, to_io, to_open, to_path, to_proc, to_r, to_regexp, to_str, to_s, to_sym
80+
Style/TrivialAccessors:
81+
Exclude:
82+
- 'lib/fluent/plugin/otlp/service_handler.rb'

lib/fluent/plugin/in_otlp.rb

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require "fluent/plugin/otlp/constant"
55
require "fluent/plugin/otlp/request"
66
require "fluent/plugin/otlp/response"
7+
require "fluent/plugin/otlp/service_handler"
78
require "fluent/plugin_helper/http_server"
89

910
require "zlib"
@@ -107,21 +108,76 @@ def response_bad_request(content_type)
107108
end
108109
end
109110

111+
class GrpcHandler
112+
class ExceptionInterceptor < GRPC::ServerInterceptor
113+
def request_response(request:, call:, method:)
114+
# call actual service
115+
yield
116+
rescue StandardError => e
117+
puts "[#{method}] Error: #{e.message}"
118+
raise
119+
end
120+
end
121+
122+
def run(logs:, metrics:, traces:)
123+
Thread.new do
124+
server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new])
125+
server.add_http2_port("0.0.0.0:4317", :this_port_is_insecure)
126+
127+
logs_handler = Otlp::ServiceHandler::Logs.new
128+
logs_handler.callback = lambda { |request|
129+
logs.call(request.to_json)
130+
Otlp::Response::Logs.build
131+
}
132+
server.handle(logs_handler)
133+
134+
metrics_handler = Otlp::ServiceHandler::Metrics.new
135+
metrics_handler.callback = lambda { |request|
136+
metrics.call(request.to_json)
137+
Otlp::Response::Metrics.build
138+
}
139+
server.handle(metrics_handler)
140+
141+
traces_handler = Otlp::ServiceHandler::Traces.new
142+
traces_handler.callback = lambda { |request|
143+
traces.call(request.to_json)
144+
Otlp::Response::Traces.build
145+
}
146+
server.handle(traces_handler)
147+
148+
server.run_till_terminated
149+
end
150+
end
151+
end
152+
110153
def start
111154
super
112155

113-
handler = HttpHandler.new
156+
http_handler = HttpHandler.new
114157
http_server_create_http_server(:in_otel_http_server_helper, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
115158
serv.post("/v1/logs") do |req|
116-
handler.logs(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_LOGS, message: record }) }
159+
http_handler.logs(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_LOGS, message: record }) }
117160
end
118161
serv.post("/v1/metrics") do |req|
119-
handler.metrics(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_METRICS, message: record }) }
162+
http_handler.metrics(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_METRICS, message: record }) }
120163
end
121164
serv.post("/v1/traces") do |req|
122-
handler.traces(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_TRACES, message: record }) }
165+
http_handler.traces(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_TRACES, message: record }) }
123166
end
124167
end
168+
169+
grpc_handler = GrpcHandler.new
170+
grpc_handler.run(
171+
logs: lambda { |record|
172+
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_LOGS, message: record })
173+
},
174+
metrics: lambda { |record|
175+
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_METRICS, message: record })
176+
},
177+
traces: lambda { |record|
178+
router.emit(@tag, Fluent::EventTime.now, { type: Otlp::RECORD_TYPE_TRACES, message: record })
179+
}
180+
)
125181
end
126182
end
127183
end

lib/fluent/plugin/otlp/response.rb

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,68 @@ def self.type(content_type)
2020
end
2121

2222
class Logs
23-
def initialize(rejected: 0, error: "")
24-
@response = Opentelemetry::Proto::Collector::Logs::V1::ExportLogsServiceResponse.new(
23+
def self.build(rejected: 0, error: "")
24+
Opentelemetry::Proto::Collector::Logs::V1::ExportLogsServiceResponse.new(
2525
partial_success: Opentelemetry::Proto::Collector::Logs::V1::ExportLogsPartialSuccess.new(
2626
rejected_log_records: rejected,
2727
error_message: error
2828
)
2929
)
3030
end
3131

32+
def initialize(rejected: 0, error: "")
33+
@response = Logs.build(rejected: rejected, error: error)
34+
end
35+
3236
def body(type:)
3337
if type == :protobuf
34-
Opentelemetry::Proto::Collector::Logs::V1::ExportLogsServiceResponse.encode(@response)
38+
@response.class.encode(@response)
3539
else
3640
@response.to_json
3741
end
3842
end
3943
end
4044

4145
class Metrics
42-
def initialize(rejected: 0, error: "")
43-
@response = Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceResponse.new(
46+
def self.build(rejected: 0, error: "")
47+
Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceResponse.new(
4448
partial_success: Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsPartialSuccess.new(
4549
rejected_data_points: rejected,
4650
error_message: error
4751
)
4852
)
4953
end
5054

55+
def initialize(rejected: 0, error: "")
56+
@response = Metrics.build(rejected: rejected, error: error)
57+
end
58+
5159
def body(type:)
5260
if type == :protobuf
53-
Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceResponse.encode(@response)
61+
@response.class.encode(@response)
5462
else
5563
@response.to_json
5664
end
5765
end
5866
end
5967

6068
class Traces
61-
def initialize(rejected: 0, error: "")
62-
@response = Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceResponse.new(
69+
def self.build(rejected: 0, error: "")
70+
Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceResponse.new(
6371
partial_success: Opentelemetry::Proto::Collector::Trace::V1::ExportTracePartialSuccess.new(
6472
rejected_spans: rejected,
6573
error_message: error
6674
)
6775
)
6876
end
6977

78+
def initialize(rejected: 0, error: "")
79+
@response = Traces.build(rejected: rejected, error: error)
80+
end
81+
7082
def body(type:)
7183
if type == :protobuf
72-
Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceResponse.encode(@response)
84+
@response.class.encode(@response)
7385
else
7486
@response.to_json
7587
end
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# frozen_string_literal: true
2+
3+
require "fluent/plugin/otlp/response"
4+
5+
require "opentelemetry/proto/collector/logs/v1/logs_service_services_pb"
6+
require "opentelemetry/proto/collector/metrics/v1/metrics_service_services_pb"
7+
require "opentelemetry/proto/collector/trace/v1/trace_service_services_pb"
8+
9+
require "grpc"
10+
11+
class Fluent::Plugin::Otlp::ServiceHandler
12+
class Logs < Opentelemetry::Proto::Collector::Logs::V1::LogsService::Service
13+
def callback=(block)
14+
@callback = block
15+
end
16+
17+
def export(req)
18+
@callback.call(req)
19+
end
20+
end
21+
22+
class Metrics < Opentelemetry::Proto::Collector::Metrics::V1::MetricsService::Service
23+
def callback=(block)
24+
@callback = block
25+
end
26+
27+
def export(req, _call)
28+
@callback.call(req)
29+
end
30+
end
31+
32+
class Traces < Opentelemetry::Proto::Collector::Trace::V1::TraceService::Service
33+
def callback=(block)
34+
@callback = block
35+
end
36+
37+
def export(req)
38+
@callback.call(req)
39+
end
40+
end
41+
end

lib/fluent/plugin/otlp/service.rb renamed to lib/fluent/plugin/otlp/service_stub.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
require "grpc"
1111

12-
class Fluent::Plugin::Otlp::Service
12+
class Fluent::Plugin::Otlp::ServiceStub
1313
class Logs
1414
def initialize(host, creds, **kw)
1515
@stub = Opentelemetry::Proto::Collector::Logs::V1::LogsService::Stub.new(host, creds, **kw)
@@ -41,6 +41,5 @@ def export(json)
4141
message = Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.decode_json(json)
4242
@stub.export(message)
4343
end
44-
4544
end
46-
end
45+
end

lib/fluent/plugin/out_otlp.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
require "fluent/plugin/otlp/constant"
44
require "fluent/plugin/otlp/request"
5-
require "fluent/plugin/otlp/service"
5+
require "fluent/plugin/otlp/service_stub"
66
require "fluent/plugin/output"
77

88
require "excon"
@@ -174,11 +174,11 @@ def export(chunk)
174174

175175
case record["type"]
176176
when Otlp::RECORD_TYPE_LOGS
177-
service = Otlp::Service::Logs.new(@grpc_config.endpoint, credential)
177+
service = Otlp::ServiceStub::Logs.new(@grpc_config.endpoint, credential)
178178
when Otlp::RECORD_TYPE_METRICS
179-
service = Otlp::Service::Metrics.new(@grpc_config.endpoint, credential)
179+
service = Otlp::ServiceStub::Metrics.new(@grpc_config.endpoint, credential)
180180
when Otlp::RECORD_TYPE_TRACES
181-
service = Otlp::Service::Traces.new(@grpc_config.endpoint, credential)
181+
service = Otlp::ServiceStub::Traces.new(@grpc_config.endpoint, credential)
182182
end
183183

184184
begin

lib/opentelemetry/proto/collector/logs/v1/logs_service_services_pb.rb

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/opentelemetry/proto/collector/metrics/v1/metrics_service_services_pb.rb

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/opentelemetry/proto/collector/profiles/v1development/profiles_service_services_pb.rb

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/opentelemetry/proto/collector/trace/v1/trace_service_services_pb.rb

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)