From ab2548ef28417852a6a52b9a96bf1b6529f6e621 Mon Sep 17 00:00:00 2001
From: val <valentin.crettaz@consulthys.com>
Date: Tue, 21 Feb 2017 09:25:43 +0100
Subject: [PATCH 1/7] Added ability to specify timestamp in message

---
 CHANGELOG.md                  |  3 +++
 lib/logstash/outputs/kafka.rb | 20 ++++++++++++++++----
 logstash-output-kafka.gemspec |  2 +-
 3 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 53dd447..8f57f0a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+## 6.1.5
+  - Add ability to specify message timestamp (#124)
+
 ## 6.1.4
   - Fix a bug where consumer was not correctly setup when `SASL_SSL` option was specified.
 
diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb
index acf2c19..35fef53 100644
--- a/lib/logstash/outputs/kafka.rb
+++ b/lib/logstash/outputs/kafka.rb
@@ -98,6 +98,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
   config :max_request_size, :validate => :number, :default => 1048576
   # The key for the message
   config :message_key, :validate => :string
+  # The timestamp for the message. This can either be:
+  #  - a field containing a timestamp value
+  #  - an sprintf-style field containing a timestamp value
+  config :message_timestamp, :validate => :string
   # the timeout setting for initial metadata request to fetch topic metadata.
   config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000
   # the max time in milliseconds before a metadata refresh is forced.
@@ -178,11 +182,19 @@ def register
     @producer = create_producer
     @codec.on_event do |event, data|
       begin
-        if @message_key.nil?
-          record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data)
-        else
-          record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data)
+        partition = nil
+        key = event.sprintf(@message_key) unless @message_key.nil?
+
+        timestamp = nil
+        unless @message_timestamp.nil?
+          if event.include? @message_timestamp
+            timestamp = event.get(@message_timestamp).to_i
+          else
+            timestamp = event.sprintf(@message_timestamp).to_i
+          end
         end
+
+        record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), partition, timestamp, key, data)
         @producer.send(record)
       rescue LogStash::ShutdownSignal
         @logger.debug('Kafka producer got shutdown signal')
diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec
index 4bce82f..4d0b7a4 100644
--- a/logstash-output-kafka.gemspec
+++ b/logstash-output-kafka.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-output-kafka'
-  s.version         = '6.1.4'
+  s.version         = '6.1.5'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker'
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

From 1a7d9089651ef92d7ee99b5ed57298219d8c6e98 Mon Sep 17 00:00:00 2001
From: val <valentin.crettaz@consulthys.com>
Date: Tue, 21 Feb 2017 10:28:28 +0100
Subject: [PATCH 2/7] Added ability to specify timestamp in message (fixed
 tests)

---
 lib/logstash/outputs/kafka.rb   |  3 ++-
 spec/unit/outputs/kafka_spec.rb | 26 ++++++++++++++++++++++----
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb
index 35fef53..11e7311 100644
--- a/lib/logstash/outputs/kafka.rb
+++ b/lib/logstash/outputs/kafka.rb
@@ -188,7 +188,8 @@ def register
         timestamp = nil
         unless @message_timestamp.nil?
           if event.include? @message_timestamp
-            timestamp = event.get(@message_timestamp).to_i
+            ts = event.get(@message_timestamp)
+            timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i
           else
             timestamp = event.sprintf(@message_timestamp).to_i
           end
diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb
index db874d7..9e80666 100644
--- a/spec/unit/outputs/kafka_spec.rb
+++ b/spec/unit/outputs/kafka_spec.rb
@@ -6,7 +6,7 @@
 describe "outputs/kafka" do
   let (:simple_kafka_config) {{'topic_id' => 'test'}}
   let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1',
-                                      '@timestamp' => LogStash::Timestamp.now}) }
+                                      '@timestamp' => LogStash::Timestamp.now, 'other_timestamp' => 1487669184685}) }
 
   context 'when initializing' do
     it "should register" do
@@ -34,7 +34,7 @@
     it 'should support Event#sprintf placeholders in topic_id' do
       topic_field = 'topic_name'
       expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
-        .with("my_topic", event.to_s)
+        .with("my_topic", nil, nil, nil, event.to_s)
       expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
       kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
       kafka.register
@@ -43,13 +43,31 @@
 
     it 'should support field referenced message_keys' do
       expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
-        .with("test", "172.0.0.1", event.to_s)
+        .with("test", nil, nil, "172.0.0.1", event.to_s)
       expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
       kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"}))
       kafka.register
       kafka.receive(event)
     end
-    
+
+    it 'should support timestamp from specified message field' do
+      expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
+        .with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s)
+      expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+      kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "@timestamp"}))
+      kafka.register
+      kafka.receive(event)
+    end
+
+    it 'should support field referenced message_timestamp' do
+      expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
+        .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s)
+      expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+      kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "%{other_timestamp}"}))
+      kafka.register
+      kafka.receive(event)
+    end
+
     it 'should raise config error when truststore location is not set and ssl is enabled' do
       kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"}))
       expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)

From f1186022cb71656f7dfadb0e8a7a07e9703c8f98 Mon Sep 17 00:00:00 2001
From: val <valentin.crettaz@consulthys.com>
Date: Tue, 21 Feb 2017 19:59:23 +0100
Subject: [PATCH 3/7] Bump to 6.2.0

---
 logstash-output-kafka.gemspec | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec
index 4d0b7a4..ca2fc11 100644
--- a/logstash-output-kafka.gemspec
+++ b/logstash-output-kafka.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-output-kafka'
-  s.version         = '6.1.5'
+  s.version         = '6.2.0'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker'
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

From 85746eda05a6efc43cded845f7470c034df9c5d0 Mon Sep 17 00:00:00 2001
From: val <valentin.crettaz@consulthys.com>
Date: Tue, 21 Feb 2017 21:05:56 +0100
Subject: [PATCH 4/7] Added ability to specify timestamp in message (added more
 tests)

---
 spec/unit/outputs/kafka_spec.rb | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb
index 9e80666..3617ee5 100644
--- a/spec/unit/outputs/kafka_spec.rb
+++ b/spec/unit/outputs/kafka_spec.rb
@@ -50,7 +50,7 @@
       kafka.receive(event)
     end
 
-    it 'should support timestamp from specified message field' do
+    it 'should support timestamp from main @timestamp field' do
       expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
         .with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s)
       expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
@@ -59,7 +59,16 @@
       kafka.receive(event)
     end
 
-    it 'should support field referenced message_timestamp' do
+    it 'should support timestamp from specified message field with long value' do
+      expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
+        .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s)
+      expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+      kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "other_timestamp"}))
+      kafka.register
+      kafka.receive(event)
+    end
+
+    it 'should support field referenced by message_timestamp' do
       expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
         .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s)
       expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)

From 4424c51194acb61b99a1b2b4f68e6107745b788d Mon Sep 17 00:00:00 2001
From: Valentin Crettaz <valentin.crettaz@consulthys.com>
Date: Tue, 12 Sep 2017 11:12:17 +0200
Subject: [PATCH 5/7] Added doc for message_timestamp field

---
 docs/index.asciidoc | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/docs/index.asciidoc b/docs/index.asciidoc
index 2af0881..337cb6a 100644
--- a/docs/index.asciidoc
+++ b/docs/index.asciidoc
@@ -85,6 +85,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
 | <<plugins-{type}s-{plugin}-linger_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-max_request_size>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-message_key>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-message_timestamp |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-metadata_fetch_timeout_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<number,number>>|No
@@ -244,6 +245,12 @@ The maximum size of a request
 
 The key for the message
 
+[id="plugins-{type}s-{plugin}-message_timestamp"]
+===== `message_timestamp` 
+  * The timestamp for the message. This can either be:
+  * - a field containing a timestamp value
+  * - an sprintf-style field containing a timestamp value
+
 [id="plugins-{type}s-{plugin}-metadata_fetch_timeout_ms"]
 ===== `metadata_fetch_timeout_ms` 
 

From 862e6e0686395d5a723e47ca60188b0986c6e425 Mon Sep 17 00:00:00 2001
From: Valentin Crettaz <valentin.crettaz@consulthys.com>
Date: Tue, 12 Sep 2017 11:13:22 +0200
Subject: [PATCH 6/7] Minor version bump

---
 logstash-output-kafka.gemspec | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec
index 452d951..82ac124 100644
--- a/logstash-output-kafka.gemspec
+++ b/logstash-output-kafka.gemspec
@@ -1,7 +1,7 @@
 Gem::Specification.new do |s|
 
   s.name            = 'logstash-output-kafka'
-  s.version         = '7.0.3'
+  s.version         = '7.1.0'
   s.licenses        = ['Apache License (2.0)']
   s.summary         = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker'
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

From a08e36d5ab701b8c5a8f4de1b8701aaa249337d7 Mon Sep 17 00:00:00 2001
From: Valentin Crettaz <valentin.crettaz@consulthys.com>
Date: Tue, 12 Sep 2017 11:13:57 +0200
Subject: [PATCH 7/7] Minor version bump

---
 CHANGELOG.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d828d2f..cdd53e0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,4 @@
-## 7.0.3
+## 7.1.0
   - Add ability to specify message timestamp (#124)
 
 ## 7.0.2