Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to specify timestamp in message #125

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

consulthys
Copy link

This is a quick fix for #124 in order to be able to add a specific timestamp to messages sunk to Kafka topics.

@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-kafka'
s.version = '6.1.4'
s.version = '6.1.5'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a new feature and not just a bug fix. I'd be happy to bump this to 6.2.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No prob, makes sense, I will do.

@charsyam
Copy link

+1

1 similar comment
@thinker0
Copy link

+1

@henridf
Copy link

henridf commented Sep 4, 2017

Looks good to me, any chance this can be reviewed by the core team? Sorely needing it (and Kafka support is surprisingly weak across the board in log shipping tools).

@jinleileiking
Copy link

+1

@jordansissel
Copy link
Contributor

The code looks OK, but this needs to be rebased.

Also needs docs added in docs/index.asciidoc

@jordansissel
Copy link
Contributor

@consulthys I think @talevy's original comment applies here. This should be a minor version bump (not patch).

Regarding the docs, having the docs in docs/index.asciidoc is somewhat new for plugins, so let me know if you run into troubles with it.

@consulthys
Copy link
Author

@jordansissel I've resolved the conflicts, amended the docs and bumped the minor version. Let me know if you want me to do anything else.

unless @message_timestamp.nil?
if event.include? @message_timestamp
ts = event.get(@message_timestamp)
timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogStash::Timestamp#to_i returns 'seconds since epoch':

[6] pry(#<LogStash::Runner>)> LogStash::Event.new.timestamp.to_i
=> 1505207654

But the Kafka RFC indicates in a few places that the unit for the Long timestamp is milliseconds, not seconds.

I think you need (ts.to_f * 1000).to_i to convert to milliseconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm not calling to_i on a Logstash::Timestamp, but on the wrapped Joda DateTime object and I was under the impression that back then it returned milliseconds, but agreed, not anymore. I'll change that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I checked timestamp.time and in Logstash 5.x, at least, it is a ruby Time object:

[6] pry(#<LogStash::Runner>)> date = LogStash::Filters::Date.new("match" => [ "message", "ISO8601" ])
=> <LogStash::Filters::Date match=>["message", "ISO8601"], id=>"date_e40418e5-e0ba-4b0f-a3a5-a04b62920d0e", enable_metric=>true, periodic_flush=>false, target=>"@timestamp", tag_on_failure=>["_dateparsefailure"]>
[7] pry(#<LogStash::Runner>)> e = LogStash::Event.new
=> 2017-09-13T04:11:30.230Z %{host} %{message}
[8] pry(#<LogStash::Runner>)> e.timestamp.time.to_i
=> 1505275890
[9] pry(#<LogStash::Runner>)> e.timestamp.time.class
=> Time

# Now try applying the date filter:
[11] pry(#<LogStash::Runner>)> e.set("message", "2017-01-01T00:00:00.000Z")
=> "2017-01-01T00:00:00.000Z"
[12] pry(#<LogStash::Runner>)> date.filter(e)
=> 2017-01-01T00:00:00.000Z %{host} 2017-01-01T00:00:00.000Z
[13] pry(#<LogStash::Runner>)> e.timestamp.time.to_i
=> 1483228800
[14] pry(#<LogStash::Runner>)> e.timestamp.time.class
=> Time

^^^ Still a Time object.

Internally, a LogStash::Event is a wrapper of the internal org.logstash.Event, and both have a Timestamp class. In Jruby, this timestamp type is provided by JrubyTimestampExtLibrary.java which has a ruby time method that returns a ruby Time object.

So, from Ruby, due to our Jruby wrappers, you don't get to see the org.logstash.Timestamp object (which is backed by Joda DateTime type), you see a Ruby Time type.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I made the same exploration and came to the same conclusion.

Still, I'm a bit surprised that one has to do such a transformation in order to get a millisecond value out of a LogStash::Timestamp. Even more surprising is the fact that nowhere else in the whole Logstash codebase something like this is needed. And if my previous statement is wrong (which I hope), then why isn't there a utility method somewhere that gets me that value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anywhere else we need to output milliseconds-since-epoch. I checked a few places I know we use time that isn't ISO8601 formatted:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for checking. I was under the impression that getting a milliseconds representation out of a date/timestamp was a common feature in most mainstream languages.

I actually found one place in the Logstash core lib that does the same transformation and could benefit from having that factored out into the TimeStamp class: https://github.com/elastic/logstash/blob/master/logstash-core/lib/logstash/agent.rb#L182

If not feasible, I'll just apply your suggestion above so we can let it be.

ts = event.get(@message_timestamp)
timestamp = ts.is_a?(LogStash::Timestamp) ? ts.time.to_i : ts.to_i
else
timestamp = event.sprintf(@message_timestamp).to_i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the goal here? I can see this very easily causing mistakes for users. What would an expected sprintf be, here?

  • "%{+%s}%{+SSS}" (same result as if you call time.to_f * 1000 basically)
  • Something else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of using sprintf here was described in my original issue (#124), but quick, it was helpful in cases where a field containing a value in seconds could also be used as the message timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have missed this comment; sorry for the delays.

If the use case is taking a numeric value as the kafka message timestamp, then I would prefer this be event.get(@message_timestamp) and not event.sprintf.

@jinleileiking
Copy link

waiting for merged

@jordansissel
Copy link
Contributor

@consulthys I left some inline comments in review with the main question being if the timestamp used was correct. Based on my reading of the kafka docs, the timestamp is milliseconds and this PR uses seconds (Time#to_i gives seconds-since-epoch). It's unclear to me from the Kafka docs what origin the ProducerMessage timestamp has (unix epoch? process start time? system boot time? something else?)

After some digging, I think I finally found the answer. In KAFKA-3025 it says this:

Timestamp is the number of milliseconds since Unix Epoch

@jinleileiking
Copy link

No this patch. No elk for ksql

@consulthys
Copy link
Author

Thanks @jordansissel, it's been months since I developed that so I'll have another look in details shortly :-)

@jordansissel
Copy link
Contributor

Confession: I found the Time(Ruby) vs DateTime(Joda) to be confusing. I think we kept Time (Ruby) for backwards compatibility having event.timestamp have a predictable and backwards-compatible interface as we were building the new Java internals.

@jinleileiking
Copy link

@jordansissel can it be merged ?

@jinleileiking
Copy link

Not merged?

@jinleileiking
Copy link

@jordansissel

@jordansissel
Copy link
Contributor

@jinleileiking Looking at the recent comments, there is still review going on in this issue. If you wish invest in speeding this up, help with testing, reviewing, or with code is always appreciated.

@consulthys
Copy link
Author

@jordansissel I commented your change request but didn't get any feedback. Once you do, I'll make sure to wrap this up. Thanks much!

@jinleileiking
Copy link

I want to test it in my env, seems not work with logstash 2.4.0

@guyboertje
Copy link

@consulthys
It looks like another rebase is needed.

@jordansissel
Is Valentin's replies to your questions satisfactory?

@jsvd jsvd added the adoptme label Jan 15, 2018
@neerav-salaria-guavus
Copy link

Any update on this change to be merged @consulthys @jordansissel .

@daggerjames
Copy link

still waiting .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.