-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added ability to specify timestamp in message #125
base: main
Are you sure you want to change the base?
Changes from 2 commits
ab2548e
1a7d908
f118602
85746ed
9b0c36f
4424c51
862e6e0
a08e36d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,6 +98,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base | |
config :max_request_size, :validate => :number, :default => 1048576 | ||
# The key for the message | ||
config :message_key, :validate => :string | ||
# The timestamp for the message. This can either be: | ||
# - a field containing a timestamp value | ||
# - an sprintf-style field containing a timestamp value | ||
config :message_timestamp, :validate => :string | ||
# the timeout setting for initial metadata request to fetch topic metadata. | ||
config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000 | ||
# the max time in milliseconds before a metadata refresh is forced. | ||
|
@@ -178,11 +182,20 @@ def register | |
@producer = create_producer | ||
@codec.on_event do |event, data| | ||
begin | ||
if @message_key.nil? | ||
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data) | ||
else | ||
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data) | ||
partition = nil | ||
key = event.sprintf(@message_key) unless @message_key.nil? | ||
|
||
timestamp = nil | ||
unless @message_timestamp.nil? | ||
if event.include? @message_timestamp | ||
ts = event.get(@message_timestamp) | ||
timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i | ||
else | ||
timestamp = event.sprintf(@message_timestamp).to_i | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the goal here? I can see this very easily causing mistakes for users. What would an expected sprintf be, here?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I must have missed this comment; sorry for the delays. If the use case is taking a numeric value as the kafka message timestamp, then I would prefer this be |
||
end | ||
end | ||
|
||
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), partition, timestamp, key, data) | ||
@producer.send(record) | ||
rescue LogStash::ShutdownSignal | ||
@logger.debug('Kafka producer got shutdown signal') | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
Gem::Specification.new do |s| | ||
|
||
s.name = 'logstash-output-kafka' | ||
s.version = '6.1.4' | ||
s.version = '6.1.5' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this is a new feature and not just a bug fix. I'd be happy to bump this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No prob, makes sense, I will do. |
||
s.licenses = ['Apache License (2.0)'] | ||
s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' | ||
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogStash::Timestamp#to_i
returns 'seconds since epoch':But the Kafka RFC indicates in a few places that the unit for the
Long timestamp
is milliseconds, not seconds.I think you need
(ts.to_f * 1000).to_i
to convert to milliseconds.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I'm not calling
to_i
on aLogstash::Timestamp
, but on the wrapped JodaDateTime
object and I was under the impression that back then it returned milliseconds, but agreed, not anymore. I'll change that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I checked
timestamp.time
and in Logstash 5.x, at least, it is a ruby Time object:Internally, a
LogStash::Event
is a wrapper of the internal org.logstash.Event, and both have a Timestamp class. In Jruby, this timestamp type is provided by JrubyTimestampExtLibrary.java which has a rubytime
method that returns a ruby Time object.So, from Ruby, due to our Jruby wrappers, you don't get to see the
org.logstash.Timestamp
object (which is backed by Joda DateTime type), you see a RubyTime
type.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I made the same exploration and came to the same conclusion.
Still, I'm a bit surprised that one has to do such a transformation in order to get a millisecond value out of a
LogStash::Timestamp
. Even more surprising is the fact that nowhere else in the whole Logstash codebase something like this is needed. And if my previous statement is wrong (which I hope), then why isn't there a utility method somewhere that gets me that value?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know anywhere else we need to output milliseconds-since-epoch. I checked a few places I know we use time that isn't ISO8601 formatted:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for checking. I was under the impression that getting a milliseconds representation out of a date/timestamp was a common feature in most mainstream languages.
I actually found one place in the Logstash core lib that does the same transformation and could benefit from having that factored out into the TimeStamp class: https://github.com/elastic/logstash/blob/master/logstash-core/lib/logstash/agent.rb#L182
If not feasible, I'll just apply your suggestion above so we can let it be.