Skip to content

Commit dd240a5

Browse files
committed
add grpc
1 parent 9a06d6e commit dd240a5

File tree

7 files changed

+353
-52
lines changed

7 files changed

+353
-52
lines changed

fluent-plugin-otlp.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ Gem::Specification.new do |spec|
3131
spec.add_dependency("excon", "~> 1.2")
3232
spec.add_dependency("fluentd", "~> 1.18")
3333
spec.add_dependency("google-protobuf", "~> 4.30")
34+
spec.add_dependency("grpc", "~> 1.71")
3435
end

lib/fluent/plugin/otlp/service.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# frozen_string_literal: true
2+
3+
require "opentelemetry/proto/collector/logs/v1/logs_service_pb"
4+
require "opentelemetry/proto/collector/logs/v1/logs_service_services_pb"
5+
require "opentelemetry/proto/collector/metrics/v1/metrics_service_pb"
6+
require "opentelemetry/proto/collector/metrics/v1/metrics_service_services_pb"
7+
require "opentelemetry/proto/collector/trace/v1/trace_service_pb"
8+
require "opentelemetry/proto/collector/trace/v1/trace_service_services_pb"
9+
10+
require "grpc"
11+
12+
class Fluent::Plugin::Otlp::Service
13+
class Logs
14+
def initialize(host, creds, **kw)
15+
@stub = Opentelemetry::Proto::Collector::Logs::V1::LogsService::Stub.new(host, creds, **kw)
16+
end
17+
18+
def export(json)
19+
message = Opentelemetry::Proto::Collector::Logs::V1::ExportLogsServiceRequest.decode_json(json)
20+
@stub.export(message)
21+
end
22+
end
23+
24+
class Metrics
25+
def initialize(host, creds, **kw)
26+
@stub = Opentelemetry::Proto::Collector::Logs::V1::MetricsService::Stub.new(host, creds, **kw)
27+
end
28+
29+
def export(json)
30+
message = Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceRequest.decode_json(json)
31+
@stub.export(message)
32+
end
33+
end
34+
35+
class Traces
36+
def initialize(host, creds, **kw)
37+
@stub = Opentelemetry::Proto::Collector::Logs::V1::TraceService::Stub.new(host, creds, **kw)
38+
end
39+
40+
def export(json)
41+
message = Opentelemetry::Proto::Collector::Trace::V1::ExportTraceServiceRequest.decode_json(json)
42+
@stub.export(message)
43+
end
44+
45+
end
46+
end

lib/fluent/plugin/out_otlp.rb

Lines changed: 115 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
require "fluent/plugin/otlp/constant"
44
require "fluent/plugin/otlp/request"
5+
require "fluent/plugin/otlp/service"
56
require "fluent/plugin/output"
67

78
require "excon"
9+
require "grpc"
810
require "json"
911
require "stringio"
1012
require "zlib"
@@ -23,7 +25,7 @@ class RetryableResponse < StandardError; end
2325
config_set_default :chunk_limit_size, 10 * 1024
2426
end
2527

26-
config_section :http, required: false, multi: false, init: true, param_name: :http_config do
28+
config_section :http, required: false, multi: false, init: false, param_name: :http_config do
2729
desc "The endpoint"
2830
config_param :endpoint, :string, default: "http://127.0.0.1:4318"
2931
desc "The proxy for HTTP request"
@@ -38,21 +40,20 @@ class RetryableResponse < StandardError; end
3840
config_param :compress, :enum, list: %i[text gzip], default: :text
3941
end
4042

43+
config_section :grpc, required: false, multi: false, init: false, param_name: :grpc_config do
44+
desc "The endpoint"
45+
config_param :endpoint, :string, default: "127.0.0.1:4317"
46+
end
47+
4148
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
4249
config_argument :protocol, :enum, list: [:tls], default: nil
4350
end
4451

4552
def configure(conf)
4653
super
4754

48-
@tls_settings = {}
49-
if @transport_config.protocol == :tls
50-
@tls_settings[:client_cert] = @transport_config.cert_path
51-
@tls_settings[:client_key] = @transport_config.private_key_path
52-
@tls_settings[:client_key_pass] = @transport_config.private_key_passphrase
53-
@tls_settings[:ssl_min_version] = Otlp::TLS_VERSIONS_MAP[@transport_config.min_version]
54-
@tls_settings[:ssl_max_version] = Otlp::TLS_VERSIONS_MAP[@transport_config.max_version]
55-
end
55+
@http_handler = HttpHandler.new(@http_config, @transport_config, log) if @http_config
56+
@grpc_handler = GrpcHandler.new(@grpc_config, @transport_config, log) if @grpc_config
5657
end
5758

5859
def multi_workers_ready?
@@ -64,67 +65,129 @@ def format(tag, time, record)
6465
end
6566

6667
def write(chunk)
67-
uri, connection = create_connection(chunk)
68-
response = connection.post
68+
if @http_handler
69+
@http_handler.export(chunk)
6970

70-
if response.status != 200
71-
if @http_config.retryable_response_codes&.include?(response.status)
72-
raise RetryableResponse, "got retryable error response from '#{uri}', response code is #{response.status}"
73-
end
74-
if @http_config.error_response_as_unrecoverable
75-
raise Fluent::UnrecoverableError, "got unrecoverable error response from '#{uri}', response code is #{response.status}"
76-
else
77-
log.error "got error response from '#{uri}', response code is #{response.status}"
78-
end
71+
return
72+
end
73+
74+
if @grpc_handler
75+
@grpc_handler.export(chunk)
7976
end
8077
end
8178

8279
private
8380

84-
def http_logs_endpoint
85-
"#{@http_config.endpoint}/v1/logs"
86-
end
81+
class HttpHandler
82+
def initialize(http_config, transport_config, logger)
83+
@http_config = http_config
84+
@transport_config = transport_config
85+
@logger = logger
86+
87+
@tls_settings = {}
88+
if @transport_config.protocol == :tls
89+
@tls_settings[:client_cert] = @transport_config.cert_path
90+
@tls_settings[:client_key] = @transport_config.private_key_path
91+
@tls_settings[:client_key_pass] = @transport_config.private_key_passphrase
92+
@tls_settings[:ssl_min_version] = Otlp::TLS_VERSIONS_MAP[@transport_config.min_version]
93+
@tls_settings[:ssl_max_version] = Otlp::TLS_VERSIONS_MAP[@transport_config.max_version]
94+
end
95+
end
8796

88-
def http_metrics_endpoint
89-
"#{@http_config.endpoint}/v1/metrics"
90-
end
97+
def export(chunk)
98+
uri, connection = create_http_connection(chunk)
99+
response = connection.post
100+
101+
if response.status != 200
102+
if @http_config.retryable_response_codes&.include?(response.status)
103+
raise RetryableResponse, "got retryable error response from '#{uri}', response code is #{response.status}"
104+
end
105+
if @http_config.error_response_as_unrecoverable
106+
raise Fluent::UnrecoverableError, "got unrecoverable error response from '#{uri}', response code is #{response.status}"
107+
else
108+
@logger.error "got error response from '#{uri}', response code is #{response.status}"
109+
end
110+
end
111+
end
112+
113+
private
114+
115+
def http_logs_endpoint
116+
"#{@http_config.endpoint}/v1/logs"
117+
end
118+
119+
def http_metrics_endpoint
120+
"#{@http_config.endpoint}/v1/metrics"
121+
end
122+
123+
def http_traces_endpoint
124+
"#{@http_config.endpoint}/v1/traces"
125+
end
126+
127+
def create_http_connection(chunk)
128+
record = JSON.parse(chunk.read)
129+
msg = record["message"]
130+
131+
begin
132+
case record["type"]
133+
when Otlp::RECORD_TYPE_LOGS
134+
uri = http_logs_endpoint
135+
body = Otlp::Request::Logs.new(msg).encode
136+
when Otlp::RECORD_TYPE_METRICS
137+
uri = http_metrics_endpoint
138+
body = Otlp::Request::Metrics.new(msg).encode
139+
when Otlp::RECORD_TYPE_TRACES
140+
uri = http_traces_endpoint
141+
body = Otlp::Request::Traces.new(msg).encode
142+
end
143+
rescue Google::Protobuf::ParseError => e
144+
# The message format does not comply with the OpenTelemetry protocol.
145+
raise ::Fluent::UnrecoverableError, e.message
146+
end
147+
148+
headers = { Otlp::CONTENT_TYPE => Otlp::CONTENT_TYPE_PROTOBUF }
149+
if @http_config.compress == :gzip
150+
headers[Otlp::CONTENT_ENCODING] = Otlp::CONTENT_ENCODING_GZIP
151+
gz = Zlib::GzipWriter.new(StringIO.new)
152+
gz << body
153+
body = gz.close.string
154+
end
91155

92-
def http_traces_endpoint
93-
"#{@http_config.endpoint}/v1/traces"
156+
Excon.defaults[:ssl_verify_peer] = false if @transport_config.insecure
157+
connection = Excon.new(uri, body: body, headers: headers, proxy: @http_config.proxy, persistent: true, **@tls_settings)
158+
[uri, connection]
159+
end
94160
end
95161

96-
def create_connection(chunk)
97-
record = JSON.parse(chunk.read)
98-
msg = record["message"]
162+
class GrpcHandler
163+
def initialize(grpc_config, transport_config, logger)
164+
@grpc_config = grpc_config
165+
@transport_config = transport_config
166+
@logger = logger
167+
end
168+
169+
def export(chunk)
170+
record = JSON.parse(chunk.read)
171+
msg = record["message"]
172+
173+
credential = :this_channel_is_insecure
99174

100-
begin
101175
case record["type"]
102176
when Otlp::RECORD_TYPE_LOGS
103-
uri = http_logs_endpoint
104-
body = Otlp::Request::Logs.new(msg).encode
177+
service = Otlp::Service::Logs.new(@grpc_config.endpoint, credential)
105178
when Otlp::RECORD_TYPE_METRICS
106-
uri = http_metrics_endpoint
107-
body = Otlp::Request::Metrics.new(msg).encode
179+
service = Otlp::Service::Metrics.new(@grpc_config.endpoint, credential)
108180
when Otlp::RECORD_TYPE_TRACES
109-
uri = http_traces_endpoint
110-
body = Otlp::Request::Traces.new(msg).encode
181+
service = Otlp::Service::Traces.new(@grpc_config.endpoint, credential)
111182
end
112-
rescue Google::Protobuf::ParseError => e
113-
# The message format does not comply with the OpenTelemetry protocol.
114-
raise ::Fluent::UnrecoverableError, e.message
115-
end
116183

117-
headers = { Otlp::CONTENT_TYPE => Otlp::CONTENT_TYPE_PROTOBUF }
118-
if @http_config.compress == :gzip
119-
headers[Otlp::CONTENT_ENCODING] = Otlp::CONTENT_ENCODING_GZIP
120-
gz = Zlib::GzipWriter.new(StringIO.new)
121-
gz << body
122-
body = gz.close.string
184+
begin
185+
service.export(msg)
186+
rescue Google::Protobuf::ParseError => e
187+
# The message format does not comply with the OpenTelemetry protocol.
188+
raise ::Fluent::UnrecoverableError, e.message
189+
end
123190
end
124-
125-
Excon.defaults[:ssl_verify_peer] = false if @transport_config.insecure
126-
connection = Excon.new(uri, body: body, headers: headers, proxy: @http_config.proxy, persistent: true, **@tls_settings)
127-
[uri, connection]
128191
end
129192
end
130193
end

lib/opentelemetry/proto/collector/logs/v1/logs_service_services_pb.rb

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/opentelemetry/proto/collector/metrics/v1/metrics_service_services_pb.rb

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)