1
1
# frozen_string_literal: true
2
2
3
3
require "fluent/plugin/input"
4
- require "fluent/plugin/otlp /constant"
5
- require "fluent/plugin/otlp /request"
6
- require "fluent/plugin/otlp /response"
7
- require "fluent/plugin/otlp /service_handler"
4
+ require "fluent/plugin/opentelemetry /constant"
5
+ require "fluent/plugin/opentelemetry /request"
6
+ require "fluent/plugin/opentelemetry /response"
7
+ require "fluent/plugin/opentelemetry /service_handler"
8
8
require "fluent/plugin_helper/http_server"
9
9
require "fluent/plugin_helper/thread"
10
10
@@ -28,8 +28,8 @@ def headers
28
28
end
29
29
30
30
module Fluent ::Plugin
31
- class OtlpInput < Input
32
- Fluent ::Plugin . register_input ( "otlp " , self )
31
+ class OpentelemetryInput < Input
32
+ Fluent ::Plugin . register_input ( "opentelemetry " , self )
33
33
34
34
helpers :thread , :http_server
35
35
@@ -67,31 +67,31 @@ def start
67
67
68
68
if @http_config
69
69
http_handler = HttpHandler . new
70
- http_server_create_http_server ( :in_otlp_http_server , addr : @http_config . bind , port : @http_config . port , logger : log ) do |serv |
70
+ http_server_create_http_server ( :in_opentelemetry_http_server , addr : @http_config . bind , port : @http_config . port , logger : log ) do |serv |
71
71
serv . post ( "/v1/logs" ) do |req |
72
- http_handler . logs ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_LOGS , message : record } ) }
72
+ http_handler . logs ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Opentelemetry ::RECORD_TYPE_LOGS , message : record } ) }
73
73
end
74
74
serv . post ( "/v1/metrics" ) do |req |
75
- http_handler . metrics ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_METRICS , message : record } ) }
75
+ http_handler . metrics ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Opentelemetry ::RECORD_TYPE_METRICS , message : record } ) }
76
76
end
77
77
serv . post ( "/v1/traces" ) do |req |
78
- http_handler . traces ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_TRACES , message : record } ) }
78
+ http_handler . traces ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Opentelemetry ::RECORD_TYPE_TRACES , message : record } ) }
79
79
end
80
80
end
81
81
end
82
82
83
83
if @grpc_config
84
- thread_create ( :in_otlp_grpc_server ) do
84
+ thread_create ( :in_opentelemetry_grpc_server ) do
85
85
grpc_handler = GrpcHandler . new ( @grpc_config , log )
86
86
grpc_handler . run (
87
87
logs : lambda { |record |
88
- router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_LOGS , message : record } )
88
+ router . emit ( @tag , Fluent ::EventTime . now , { type : Opentelemetry ::RECORD_TYPE_LOGS , message : record } )
89
89
} ,
90
90
metrics : lambda { |record |
91
- router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_METRICS , message : record } )
91
+ router . emit ( @tag , Fluent ::EventTime . now , { type : Opentelemetry ::RECORD_TYPE_METRICS , message : record } )
92
92
} ,
93
93
traces : lambda { |record |
94
- router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_TRACES , message : record } )
94
+ router . emit ( @tag , Fluent ::EventTime . now , { type : Opentelemetry ::RECORD_TYPE_TRACES , message : record } )
95
95
}
96
96
)
97
97
end
@@ -100,15 +100,15 @@ def start
100
100
101
101
class HttpHandler
102
102
def logs ( req , &block )
103
- common ( req , Otlp ::Request ::Logs , Otlp ::Response ::Logs , &block )
103
+ common ( req , Opentelemetry ::Request ::Logs , Opentelemetry ::Response ::Logs , &block )
104
104
end
105
105
106
106
def metrics ( req , &block )
107
- common ( req , Otlp ::Request ::Metrics , Otlp ::Response ::Metrics , &block )
107
+ common ( req , Opentelemetry ::Request ::Metrics , Opentelemetry ::Response ::Metrics , &block )
108
108
end
109
109
110
110
def traces ( req , &block )
111
- common ( req , Otlp ::Request ::Traces , Otlp ::Response ::Traces , &block )
111
+ common ( req , Opentelemetry ::Request ::Traces , Opentelemetry ::Response ::Traces , &block )
112
112
end
113
113
114
114
private
@@ -120,7 +120,7 @@ def common(req, request_class, response_class)
120
120
return response_bad_request ( content_type ) unless valid_content_encoding? ( content_encoding )
121
121
122
122
body = req . body
123
- body = Zlib ::GzipReader . new ( StringIO . new ( body ) ) . read if content_encoding == Otlp ::CONTENT_ENCODING_GZIP
123
+ body = Zlib ::GzipReader . new ( StringIO . new ( body ) ) . read if content_encoding == Opentelemetry ::CONTENT_ENCODING_GZIP
124
124
125
125
begin
126
126
record = request_class . new ( body ) . record
@@ -132,12 +132,12 @@ def common(req, request_class, response_class)
132
132
yield record
133
133
134
134
res = response_class . new
135
- response ( 200 , content_type , res . body ( type : Otlp ::Response . type ( content_type ) ) )
135
+ response ( 200 , content_type , res . body ( type : Opentelemetry ::Response . type ( content_type ) ) )
136
136
end
137
137
138
138
def valid_content_type? ( content_type )
139
139
case content_type
140
- when Otlp ::CONTENT_TYPE_PROTOBUF , Otlp ::CONTENT_TYPE_JSON
140
+ when Opentelemetry ::CONTENT_TYPE_PROTOBUF , Opentelemetry ::CONTENT_TYPE_JSON
141
141
true
142
142
else
143
143
false
@@ -147,15 +147,15 @@ def valid_content_type?(content_type)
147
147
def valid_content_encoding? ( content_encoding )
148
148
return true if content_encoding . nil?
149
149
150
- content_encoding == Otlp ::CONTENT_ENCODING_GZIP
150
+ content_encoding == Opentelemetry ::CONTENT_ENCODING_GZIP
151
151
end
152
152
153
153
def response ( code , content_type , body )
154
- [ code , { Otlp ::CONTENT_TYPE => content_type } , body ]
154
+ [ code , { Opentelemetry ::CONTENT_TYPE => content_type } , body ]
155
155
end
156
156
157
157
def response_unsupported_media_type
158
- response ( 415 , Otlp ::CONTENT_TYPE_PAIN , "415 unsupported media type, supported: [application/json, application/x-protobuf]" )
158
+ response ( 415 , Opentelemetry ::CONTENT_TYPE_PAIN , "415 unsupported media type, supported: [application/json, application/x-protobuf]" )
159
159
end
160
160
161
161
def response_bad_request ( content_type )
@@ -183,24 +183,24 @@ def run(logs:, metrics:, traces:)
183
183
server = GRPC ::RpcServer . new ( interceptors : [ ExceptionInterceptor . new ] )
184
184
server . add_http2_port ( "#{ @grpc_config . bind } :#{ @grpc_config . port } " , :this_port_is_insecure )
185
185
186
- logs_handler = Otlp ::ServiceHandler ::Logs . new
186
+ logs_handler = Opentelemetry ::ServiceHandler ::Logs . new
187
187
logs_handler . callback = lambda { |request |
188
188
logs . call ( request . to_json )
189
- Otlp ::Response ::Logs . build
189
+ Opentelemetry ::Response ::Logs . build
190
190
}
191
191
server . handle ( logs_handler )
192
192
193
- metrics_handler = Otlp ::ServiceHandler ::Metrics . new
193
+ metrics_handler = Opentelemetry ::ServiceHandler ::Metrics . new
194
194
metrics_handler . callback = lambda { |request |
195
195
metrics . call ( request . to_json )
196
- Otlp ::Response ::Metrics . build
196
+ Opentelemetry ::Response ::Metrics . build
197
197
}
198
198
server . handle ( metrics_handler )
199
199
200
- traces_handler = Otlp ::ServiceHandler ::Traces . new
200
+ traces_handler = Opentelemetry ::ServiceHandler ::Traces . new
201
201
traces_handler . callback = lambda { |request |
202
202
traces . call ( request . to_json )
203
- Otlp ::Response ::Traces . build
203
+ Opentelemetry ::Response ::Traces . build
204
204
}
205
205
server . handle ( traces_handler )
206
206
0 commit comments