From ab2548ef28417852a6a52b9a96bf1b6529f6e621 Mon Sep 17 00:00:00 2001 From: val <valentin.crettaz@consulthys.com> Date: Tue, 21 Feb 2017 09:25:43 +0100 Subject: [PATCH 1/7] Added ability to specify timestamp in message --- CHANGELOG.md | 3 +++ lib/logstash/outputs/kafka.rb | 20 ++++++++++++++++---- logstash-output-kafka.gemspec | 2 +- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53dd447..8f57f0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 6.1.5 + - Add ability to specify message timestamp (#124) + ## 6.1.4 - Fix a bug where consumer was not correctly setup when `SASL_SSL` option was specified. diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index acf2c19..35fef53 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -98,6 +98,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :max_request_size, :validate => :number, :default => 1048576 # The key for the message config :message_key, :validate => :string + # The timestamp for the message. This can either be: + # - a field containing a timestamp value + # - an sprintf-style field containing a timestamp value + config :message_timestamp, :validate => :string # the timeout setting for initial metadata request to fetch topic metadata. config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000 # the max time in milliseconds before a metadata refresh is forced. @@ -178,11 +182,19 @@ def register @producer = create_producer @codec.on_event do |event, data| begin - if @message_key.nil? - record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data) - else - record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data) + partition = nil + key = event.sprintf(@message_key) unless @message_key.nil? + + timestamp = nil + unless @message_timestamp.nil? + if event.include? @message_timestamp + timestamp = event.get(@message_timestamp).to_i + else + timestamp = event.sprintf(@message_timestamp).to_i + end end + + record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), partition, timestamp, key, data) @producer.send(record) rescue LogStash::ShutdownSignal @logger.debug('Kafka producer got shutdown signal') diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec index 4bce82f..4d0b7a4 100644 --- a/logstash-output-kafka.gemspec +++ b/logstash-output-kafka.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-kafka' - s.version = '6.1.4' + s.version = '6.1.5' s.licenses = ['Apache License (2.0)'] s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 1a7d9089651ef92d7ee99b5ed57298219d8c6e98 Mon Sep 17 00:00:00 2001 From: val <valentin.crettaz@consulthys.com> Date: Tue, 21 Feb 2017 10:28:28 +0100 Subject: [PATCH 2/7] Added ability to specify timestamp in message (fixed tests) --- lib/logstash/outputs/kafka.rb | 3 ++- spec/unit/outputs/kafka_spec.rb | 26 ++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 35fef53..11e7311 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -188,7 +188,8 @@ def register timestamp = nil unless @message_timestamp.nil? if event.include? @message_timestamp - timestamp = event.get(@message_timestamp).to_i + ts = event.get(@message_timestamp) + timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i else timestamp = event.sprintf(@message_timestamp).to_i end diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index db874d7..9e80666 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -6,7 +6,7 @@ describe "outputs/kafka" do let (:simple_kafka_config) {{'topic_id' => 'test'}} let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1', - '@timestamp' => LogStash::Timestamp.now}) } + '@timestamp' => LogStash::Timestamp.now, 'other_timestamp' => 1487669184685}) } context 'when initializing' do it "should register" do @@ -34,7 +34,7 @@ it 'should support Event#sprintf placeholders in topic_id' do topic_field = 'topic_name' expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("my_topic", event.to_s) + .with("my_topic", nil, nil, nil, event.to_s) expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) kafka.register @@ -43,13 +43,31 @@ it 'should support field referenced message_keys' do expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("test", "172.0.0.1", event.to_s) + .with("test", nil, nil, "172.0.0.1", event.to_s) expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"})) kafka.register kafka.receive(event) end - + + it 'should support timestamp from specified message field' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "@timestamp"})) + kafka.register + kafka.receive(event) + end + + it 'should support field referenced message_timestamp' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "%{other_timestamp}"})) + kafka.register + kafka.receive(event) + end + it 'should raise config error when truststore location is not set and ssl is enabled' do kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"})) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) From f1186022cb71656f7dfadb0e8a7a07e9703c8f98 Mon Sep 17 00:00:00 2001 From: val <valentin.crettaz@consulthys.com> Date: Tue, 21 Feb 2017 19:59:23 +0100 Subject: [PATCH 3/7] Bump to 6.2.0 --- logstash-output-kafka.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec index 4d0b7a4..ca2fc11 100644 --- a/logstash-output-kafka.gemspec +++ b/logstash-output-kafka.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-kafka' - s.version = '6.1.5' + s.version = '6.2.0' s.licenses = ['Apache License (2.0)'] s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 85746eda05a6efc43cded845f7470c034df9c5d0 Mon Sep 17 00:00:00 2001 From: val <valentin.crettaz@consulthys.com> Date: Tue, 21 Feb 2017 21:05:56 +0100 Subject: [PATCH 4/7] Added ability to specify timestamp in message (added more tests) --- spec/unit/outputs/kafka_spec.rb | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index 9e80666..3617ee5 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -50,7 +50,7 @@ kafka.receive(event) end - it 'should support timestamp from specified message field' do + it 'should support timestamp from main @timestamp field' do expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) .with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s) expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) @@ -59,7 +59,16 @@ kafka.receive(event) end - it 'should support field referenced message_timestamp' do + it 'should support timestamp from specified message field with long value' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "other_timestamp"})) + kafka.register + kafka.receive(event) + end + + it 'should support field referenced by message_timestamp' do expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s) expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) From 4424c51194acb61b99a1b2b4f68e6107745b788d Mon Sep 17 00:00:00 2001 From: Valentin Crettaz <valentin.crettaz@consulthys.com> Date: Tue, 12 Sep 2017 11:12:17 +0200 Subject: [PATCH 5/7] Added doc for message_timestamp field --- docs/index.asciidoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 2af0881..337cb6a 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -85,6 +85,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ | <<plugins-{type}s-{plugin}-linger_ms>> |<<number,number>>|No | <<plugins-{type}s-{plugin}-max_request_size>> |<<number,number>>|No | <<plugins-{type}s-{plugin}-message_key>> |<<string,string>>|No +| <<plugins-{type}s-{plugin}-message_timestamp |<<string,string>>|No | <<plugins-{type}s-{plugin}-metadata_fetch_timeout_ms>> |<<number,number>>|No | <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number>>|No | <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<number,number>>|No @@ -244,6 +245,12 @@ The maximum size of a request The key for the message +[id="plugins-{type}s-{plugin}-message_timestamp"] +===== `message_timestamp` + * The timestamp for the message. This can either be: + * - a field containing a timestamp value + * - an sprintf-style field containing a timestamp value + [id="plugins-{type}s-{plugin}-metadata_fetch_timeout_ms"] ===== `metadata_fetch_timeout_ms` From 862e6e0686395d5a723e47ca60188b0986c6e425 Mon Sep 17 00:00:00 2001 From: Valentin Crettaz <valentin.crettaz@consulthys.com> Date: Tue, 12 Sep 2017 11:13:22 +0200 Subject: [PATCH 6/7] Minor version bump --- logstash-output-kafka.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec index 452d951..82ac124 100644 --- a/logstash-output-kafka.gemspec +++ b/logstash-output-kafka.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-kafka' - s.version = '7.0.3' + s.version = '7.1.0' s.licenses = ['Apache License (2.0)'] s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From a08e36d5ab701b8c5a8f4de1b8701aaa249337d7 Mon Sep 17 00:00:00 2001 From: Valentin Crettaz <valentin.crettaz@consulthys.com> Date: Tue, 12 Sep 2017 11:13:57 +0200 Subject: [PATCH 7/7] Minor version bump --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d828d2f..cdd53e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 7.0.3 +## 7.1.0 - Add ability to specify message timestamp (#124) ## 7.0.2