1
1
# frozen_string_literal: true
2
2
3
+ require "excon"
3
4
require "fluent/plugin/otlp/constant"
4
5
require "fluent/plugin/otlp/request"
5
6
require "fluent/plugin/output"
@@ -21,8 +22,8 @@ class OtlpOutput < Output
21
22
config_section :http , required : false , multi : false , init : true , param_name : :http_config do
22
23
desc "The endpoint"
23
24
config_param :endpoint , :string , default : "http://127.0.0.1:4318"
24
- desc ' The proxy for HTTP request'
25
- config_param :proxy , :string , default : ENV [ ' HTTP_PROXY' ] || ENV [ ' http_proxy' ]
25
+ desc " The proxy for HTTP request"
26
+ config_param :proxy , :string , default : ENV [ " HTTP_PROXY" ] || ENV [ " http_proxy" ]
26
27
end
27
28
28
29
desc "Compress request body"
@@ -40,8 +41,6 @@ def configure(conf)
40
41
OtlpOutput . const_set ( :HTTP_LOGS_ENDPOINT , "#{ @http_config . endpoint } /v1/logs" . freeze )
41
42
OtlpOutput . const_set ( :HTTP_METRICS_ENDPOINT , "#{ @http_config . endpoint } /v1/metrics" . freeze )
42
43
OtlpOutput . const_set ( :HTTP_TRACES_ENDPOINT , "#{ @http_config . endpoint } /v1/traces" . freeze )
43
-
44
- @http_proxy_uri = URI . parse ( @http_config . proxy ) if @http_config . proxy
45
44
end
46
45
47
46
def multi_workers_ready?
@@ -53,16 +52,16 @@ def format(tag, time, record)
53
52
end
54
53
55
54
def write ( chunk )
56
- uri , req = create_uri_request ( chunk )
57
-
58
- Net :: HTTP . start ( uri . host , uri . port , @http_proxy_uri &. host , @http_proxy_uri &. port , @http_proxy_uri &. user , @http_proxy_uri &. password ) do | http |
59
- http . request ( req )
55
+ uri , connection = create_connection ( chunk )
56
+ response = connection . post
57
+ if response . status != 200
58
+ log . error "got error response from ' #{ uri . to_s } '"
60
59
end
61
60
end
62
61
63
62
private
64
63
65
- def create_uri_request ( chunk )
64
+ def create_connection ( chunk )
66
65
record = JSON . parse ( chunk . read )
67
66
msg = record [ "message" ]
68
67
@@ -77,22 +76,19 @@ def create_uri_request(chunk)
77
76
uri = HTTP_TRACES_ENDPOINT
78
77
body = Otlp ::Request ::Traces . new ( msg ) . encode
79
78
else
80
- raise "Unknown record type: #{ record [ " type" ] } "
79
+ raise "Unknown record type: #{ record [ ' type' ] } "
81
80
end
82
81
83
- uri = URI . parse ( uri )
84
- req = Net ::HTTP ::Post . new ( uri . request_uri )
85
- req [ "Content-Type" ] = Otlp ::CONTENT_TYPE_PROTOBUF
86
-
82
+ headers = { "Content-Type" => Otlp ::CONTENT_TYPE_PROTOBUF }
87
83
if @compress == :gzip
88
- req [ "Content-Encoding" ] = Otlp ::CONTENT_ENCODING_GZIP
84
+ headers [ "Content-Encoding" ] = Otlp ::CONTENT_ENCODING_GZIP
89
85
gz = Zlib ::GzipWriter . new ( StringIO . new )
90
86
gz << body
91
87
body = gz . close . string
92
88
end
93
89
94
- req . body = body
95
- [ uri , req ]
90
+ connection = Excon . new ( uri , body : body , headers : headers , proxy : @http_config . proxy , persistent : true )
91
+ [ uri , connection ]
96
92
end
97
93
end
98
94
end
0 commit comments