-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added ability to specify timestamp in message #125
base: main
Are you sure you want to change the base?
Conversation
logstash-output-kafka.gemspec
Outdated
@@ -1,7 +1,7 @@ | |||
Gem::Specification.new do |s| | |||
|
|||
s.name = 'logstash-output-kafka' | |||
s.version = '6.1.4' | |||
s.version = '6.1.5' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a new feature and not just a bug fix. I'd be happy to bump this to 6.2.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No prob, makes sense, I will do.
+1 |
1 similar comment
+1 |
Looks good to me, any chance this can be reviewed by the core team? Sorely needing it (and Kafka support is surprisingly weak across the board in log shipping tools). |
+1 |
The code looks OK, but this needs to be rebased. Also needs docs added in docs/index.asciidoc |
@consulthys I think @talevy's original comment applies here. This should be a minor version bump (not patch). Regarding the docs, having the docs in docs/index.asciidoc is somewhat new for plugins, so let me know if you run into troubles with it. |
@jordansissel I've resolved the conflicts, amended the docs and bumped the minor version. Let me know if you want me to do anything else. |
unless @message_timestamp.nil? | ||
if event.include? @message_timestamp | ||
ts = event.get(@message_timestamp) | ||
timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogStash::Timestamp#to_i
returns 'seconds since epoch':
[6] pry(#<LogStash::Runner>)> LogStash::Event.new.timestamp.to_i
=> 1505207654
But the Kafka RFC indicates in a few places that the unit for the Long timestamp
is milliseconds, not seconds.
I think you need (ts.to_f * 1000).to_i
to convert to milliseconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I'm not calling to_i
on a Logstash::Timestamp
, but on the wrapped Joda DateTime
object and I was under the impression that back then it returned milliseconds, but agreed, not anymore. I'll change that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I checked timestamp.time
and in Logstash 5.x, at least, it is a ruby Time object:
[6] pry(#<LogStash::Runner>)> date = LogStash::Filters::Date.new("match" => [ "message", "ISO8601" ])
=> <LogStash::Filters::Date match=>["message", "ISO8601"], id=>"date_e40418e5-e0ba-4b0f-a3a5-a04b62920d0e", enable_metric=>true, periodic_flush=>false, target=>"@timestamp", tag_on_failure=>["_dateparsefailure"]>
[7] pry(#<LogStash::Runner>)> e = LogStash::Event.new
=> 2017-09-13T04:11:30.230Z %{host} %{message}
[8] pry(#<LogStash::Runner>)> e.timestamp.time.to_i
=> 1505275890
[9] pry(#<LogStash::Runner>)> e.timestamp.time.class
=> Time
# Now try applying the date filter:
[11] pry(#<LogStash::Runner>)> e.set("message", "2017-01-01T00:00:00.000Z")
=> "2017-01-01T00:00:00.000Z"
[12] pry(#<LogStash::Runner>)> date.filter(e)
=> 2017-01-01T00:00:00.000Z %{host} 2017-01-01T00:00:00.000Z
[13] pry(#<LogStash::Runner>)> e.timestamp.time.to_i
=> 1483228800
[14] pry(#<LogStash::Runner>)> e.timestamp.time.class
=> Time
^^^ Still a Time object.
Internally, a LogStash::Event
is a wrapper of the internal org.logstash.Event, and both have a Timestamp class. In Jruby, this timestamp type is provided by JrubyTimestampExtLibrary.java which has a ruby time
method that returns a ruby Time object.
So, from Ruby, due to our Jruby wrappers, you don't get to see the org.logstash.Timestamp
object (which is backed by Joda DateTime type), you see a Ruby Time
type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I made the same exploration and came to the same conclusion.
Still, I'm a bit surprised that one has to do such a transformation in order to get a millisecond value out of a LogStash::Timestamp
. Even more surprising is the fact that nowhere else in the whole Logstash codebase something like this is needed. And if my previous statement is wrong (which I hope), then why isn't there a utility method somewhere that gets me that value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know anywhere else we need to output milliseconds-since-epoch. I checked a few places I know we use time that isn't ISO8601 formatted:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for checking. I was under the impression that getting a milliseconds representation out of a date/timestamp was a common feature in most mainstream languages.
I actually found one place in the Logstash core lib that does the same transformation and could benefit from having that factored out into the TimeStamp class: https://github.com/elastic/logstash/blob/master/logstash-core/lib/logstash/agent.rb#L182
If not feasible, I'll just apply your suggestion above so we can let it be.
ts = event.get(@message_timestamp) | ||
timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i | ||
else | ||
timestamp = event.sprintf(@message_timestamp).to_i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the goal here? I can see this very easily causing mistakes for users. What would an expected sprintf be, here?
"%{+%s}%{+SSS}"
(same result as if you calltime.to_f * 1000
basically)- Something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of using sprintf
here was described in my original issue (#124), but quick, it was helpful in cases where a field containing a value in seconds could also be used as the message timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must have missed this comment; sorry for the delays.
If the use case is taking a numeric value as the kafka message timestamp, then I would prefer this be event.get(@message_timestamp)
and not event.sprintf
.
waiting for merged |
@consulthys I left some inline comments in review with the main question being if the timestamp used was correct. Based on my reading of the kafka docs, the timestamp is milliseconds and this PR uses seconds ( After some digging, I think I finally found the answer. In KAFKA-3025 it says this:
|
No this patch. No elk for ksql |
Thanks @jordansissel, it's been months since I developed that so I'll have another look in details shortly :-) |
Confession: I found the Time(Ruby) vs DateTime(Joda) to be confusing. I think we kept |
@jordansissel can it be merged ? |
Not merged? |
@jinleileiking Looking at the recent comments, there is still review going on in this issue. If you wish invest in speeding this up, help with testing, reviewing, or with code is always appreciated. |
@jordansissel I commented your change request but didn't get any feedback. Once you do, I'll make sure to wrap this up. Thanks much! |
I want to test it in my env, seems not work with logstash 2.4.0 |
@consulthys @jordansissel |
Any update on this change to be merged @consulthys @jordansissel . |
still waiting |
This is a quick fix for #124 in order to be able to add a specific timestamp to messages sunk to Kafka topics.