diff --git a/CHANGELOG.md b/CHANGELOG.md index b48eade..cdd53e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 7.1.0 + - Add ability to specify message timestamp (#124) + ## 7.0.2 - Docs: Fix misleading info about the default codec diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 2af0881..337cb6a 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -85,6 +85,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -244,6 +245,12 @@ The maximum size of a request The key for the message +[id="plugins-{type}s-{plugin}-message_timestamp"] +===== `message_timestamp` + * The timestamp for the message. This can either be: + * - a field containing a timestamp value + * - an sprintf-style field containing a timestamp value + [id="plugins-{type}s-{plugin}-metadata_fetch_timeout_ms"] ===== `metadata_fetch_timeout_ms` diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 4c4d90a..c035bfb 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -96,6 +96,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :max_request_size, :validate => :number, :default => 1048576 # The key for the message config :message_key, :validate => :string + # The timestamp for the message. This can either be: + # - a field containing a timestamp value + # - an sprintf-style field containing a timestamp value + config :message_timestamp, :validate => :string # the timeout setting for initial metadata request to fetch topic metadata. config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000 # the max time in milliseconds before a metadata refresh is forced. @@ -173,11 +177,20 @@ def register @producer = create_producer @codec.on_event do |event, data| begin - if @message_key.nil? - record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data) - else - record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data) + partition = nil + key = event.sprintf(@message_key) unless @message_key.nil? + + timestamp = nil + unless @message_timestamp.nil? + if event.include? @message_timestamp + ts = event.get(@message_timestamp) + timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i + else + timestamp = event.sprintf(@message_timestamp).to_i + end end + + record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), partition, timestamp, key, data) @producer.send(record) rescue LogStash::ShutdownSignal @logger.debug('Kafka producer got shutdown signal') diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec index 0bf24af..82ac124 100644 --- a/logstash-output-kafka.gemspec +++ b/logstash-output-kafka.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-kafka' - s.version = '7.0.2' + s.version = '7.1.0' s.licenses = ['Apache License (2.0)'] s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index db874d7..3617ee5 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -6,7 +6,7 @@ describe "outputs/kafka" do let (:simple_kafka_config) {{'topic_id' => 'test'}} let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1', - '@timestamp' => LogStash::Timestamp.now}) } + '@timestamp' => LogStash::Timestamp.now, 'other_timestamp' => 1487669184685}) } context 'when initializing' do it "should register" do @@ -34,7 +34,7 @@ it 'should support Event#sprintf placeholders in topic_id' do topic_field = 'topic_name' expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("my_topic", event.to_s) + .with("my_topic", nil, nil, nil, event.to_s) expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) kafka.register @@ -43,13 +43,40 @@ it 'should support field referenced message_keys' do expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("test", "172.0.0.1", event.to_s) + .with("test", nil, nil, "172.0.0.1", event.to_s) expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"})) kafka.register kafka.receive(event) end - + + it 'should support timestamp from main @timestamp field' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "@timestamp"})) + kafka.register + kafka.receive(event) + end + + it 'should support timestamp from specified message field with long value' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "other_timestamp"})) + kafka.register + kafka.receive(event) + end + + it 'should support field referenced by message_timestamp' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "%{other_timestamp}"})) + kafka.register + kafka.receive(event) + end + it 'should raise config error when truststore location is not set and ssl is enabled' do kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"})) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)