Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to specify timestamp in message #125

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.1.5
- Add ability to specify message timestamp (#124)

## 6.1.4
- Fix a bug where consumer was not correctly setup when `SASL_SSL` option was specified.

Expand Down
21 changes: 17 additions & 4 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :max_request_size, :validate => :number, :default => 1048576
# The key for the message
config :message_key, :validate => :string
# The timestamp for the message. This can either be:
# - a field containing a timestamp value
# - an sprintf-style field containing a timestamp value
config :message_timestamp, :validate => :string
# the timeout setting for initial metadata request to fetch topic metadata.
config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000
# the max time in milliseconds before a metadata refresh is forced.
Expand Down Expand Up @@ -178,11 +182,20 @@ def register
@producer = create_producer
@codec.on_event do |event, data|
begin
if @message_key.nil?
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data)
else
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data)
partition = nil
key = event.sprintf(@message_key) unless @message_key.nil?

timestamp = nil
unless @message_timestamp.nil?
if event.include? @message_timestamp
ts = event.get(@message_timestamp)
timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogStash::Timestamp#to_i returns 'seconds since epoch':

[6] pry(#<LogStash::Runner>)> LogStash::Event.new.timestamp.to_i
=> 1505207654

But the Kafka RFC indicates in a few places that the unit for the Long timestamp is milliseconds, not seconds.

I think you need (ts.to_f * 1000).to_i to convert to milliseconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm not calling to_i on a Logstash::Timestamp, but on the wrapped Joda DateTime object and I was under the impression that back then it returned milliseconds, but agreed, not anymore. I'll change that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I checked timestamp.time and in Logstash 5.x, at least, it is a ruby Time object:

[6] pry(#<LogStash::Runner>)> date = LogStash::Filters::Date.new("match" => [ "message", "ISO8601" ])
=> <LogStash::Filters::Date match=>["message", "ISO8601"], id=>"date_e40418e5-e0ba-4b0f-a3a5-a04b62920d0e", enable_metric=>true, periodic_flush=>false, target=>"@timestamp", tag_on_failure=>["_dateparsefailure"]>
[7] pry(#<LogStash::Runner>)> e = LogStash::Event.new
=> 2017-09-13T04:11:30.230Z %{host} %{message}
[8] pry(#<LogStash::Runner>)> e.timestamp.time.to_i
=> 1505275890
[9] pry(#<LogStash::Runner>)> e.timestamp.time.class
=> Time

# Now try applying the date filter:
[11] pry(#<LogStash::Runner>)> e.set("message", "2017-01-01T00:00:00.000Z")
=> "2017-01-01T00:00:00.000Z"
[12] pry(#<LogStash::Runner>)> date.filter(e)
=> 2017-01-01T00:00:00.000Z %{host} 2017-01-01T00:00:00.000Z
[13] pry(#<LogStash::Runner>)> e.timestamp.time.to_i
=> 1483228800
[14] pry(#<LogStash::Runner>)> e.timestamp.time.class
=> Time

^^^ Still a Time object.

Internally, a LogStash::Event is a wrapper of the internal org.logstash.Event, and both have a Timestamp class. In Jruby, this timestamp type is provided by JrubyTimestampExtLibrary.java which has a ruby time method that returns a ruby Time object.

So, from Ruby, due to our Jruby wrappers, you don't get to see the org.logstash.Timestamp object (which is backed by Joda DateTime type), you see a Ruby Time type.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I made the same exploration and came to the same conclusion.

Still, I'm a bit surprised that one has to do such a transformation in order to get a millisecond value out of a LogStash::Timestamp. Even more surprising is the fact that nowhere else in the whole Logstash codebase something like this is needed. And if my previous statement is wrong (which I hope), then why isn't there a utility method somewhere that gets me that value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anywhere else we need to output milliseconds-since-epoch. I checked a few places I know we use time that isn't ISO8601 formatted:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for checking. I was under the impression that getting a milliseconds representation out of a date/timestamp was a common feature in most mainstream languages.

I actually found one place in the Logstash core lib that does the same transformation and could benefit from having that factored out into the TimeStamp class: https://github.com/elastic/logstash/blob/master/logstash-core/lib/logstash/agent.rb#L182

If not feasible, I'll just apply your suggestion above so we can let it be.

else
timestamp = event.sprintf(@message_timestamp).to_i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the goal here? I can see this very easily causing mistakes for users. What would an expected sprintf be, here?

  • "%{+%s}%{+SSS}" (same result as if you call time.to_f * 1000 basically)
  • Something else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of using sprintf here was described in my original issue (#124), but quick, it was helpful in cases where a field containing a value in seconds could also be used as the message timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have missed this comment; sorry for the delays.

If the use case is taking a numeric value as the kafka message timestamp, then I would prefer this be event.get(@message_timestamp) and not event.sprintf.

end
end

record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), partition, timestamp, key, data)
@producer.send(record)
rescue LogStash::ShutdownSignal
@logger.debug('Kafka producer got shutdown signal')
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-kafka'
s.version = '6.1.4'
s.version = '6.1.5'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a new feature and not just a bug fix. I'd be happy to bump this to 6.2.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No prob, makes sense, I will do.

s.licenses = ['Apache License (2.0)']
s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker'
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
26 changes: 22 additions & 4 deletions spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
describe "outputs/kafka" do
let (:simple_kafka_config) {{'topic_id' => 'test'}}
let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1',
'@timestamp' => LogStash::Timestamp.now}) }
'@timestamp' => LogStash::Timestamp.now, 'other_timestamp' => 1487669184685}) }

context 'when initializing' do
it "should register" do
Expand Down Expand Up @@ -34,7 +34,7 @@
it 'should support Event#sprintf placeholders in topic_id' do
topic_field = 'topic_name'
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("my_topic", event.to_s)
.with("my_topic", nil, nil, nil, event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
kafka.register
Expand All @@ -43,13 +43,31 @@

it 'should support field referenced message_keys' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("test", "172.0.0.1", event.to_s)
.with("test", nil, nil, "172.0.0.1", event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"}))
kafka.register
kafka.receive(event)
end


it 'should support timestamp from specified message field' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("test", nil, event.get('@timestamp').time.to_i, nil, event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "@timestamp"}))
kafka.register
kafka.receive(event)
end

it 'should support field referenced message_timestamp' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
.with("test", nil, event.get('other_timestamp').to_i, nil, event.to_s)
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_timestamp" => "%{other_timestamp}"}))
kafka.register
kafka.receive(event)
end

it 'should raise config error when truststore location is not set and ssl is enabled' do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"}))
expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
Expand Down