Skip to content
This repository was archived by the owner on Dec 7, 2020. It is now read-only.

Commit 0cd872b

Browse files
eliaslevyjoekiller
authored andcommitted
KafkaProducer: Publishing messages with timestamps. (#52)
* KafkaProducer: Publishing messages with timestamps. Allow passing in a timestamp for messages at publishing time. * Fix broken test.
1 parent 5d1bd78 commit 0cd872b

File tree

3 files changed

+60
-5
lines changed

3 files changed

+60
-5
lines changed

lib/jruby-kafka/kafka-producer.rb

+16-4
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,24 @@ def initialize(opts = {})
4040
java_alias :send_method , :send, [ProducerRecord]
4141
java_alias :send_cb_method, :send, [ProducerRecord, Callback.java_class]
4242

43-
# throws FailedToSendMessageException or if not connected, StandardError.
44-
def send_msg(topic, partition, key, value, &block)
43+
# Send a message to the cluster.
44+
#
45+
# @param [String] topic The topic to send the message to.
46+
# @param [Integer,nil] partition The topic partition to send the message to, or nil to allow
47+
# the configured partitioner class to select the partition.
48+
# @param [String,nil] key The message key, if there is one. Otherwise, nil.
49+
# @param [String] value The message value.
50+
# @param [Integer,nil] timestamp The message timestamp in milliseconds. If nil, the
51+
# producer will assign it the current time.
52+
#
53+
# @raise [FailedToSendMessageException] if it can't send the message
54+
def send_msg(topic, partition, key, value, timestamp=nil, &block)
55+
record = ProducerRecord.new(topic, partition, timestamp, key, value)
56+
4557
if block
46-
send_cb_method ProducerRecord.new(topic, partition, key, value), RubyCallback.new(block)
58+
send_cb_method record, RubyCallback.new(block)
4759
else
48-
send_method ProducerRecord.new(topic, partition, key, value)
60+
send_method record
4961
end
5062
end
5163
end

test/test_kafka-producer.rb

+34-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ def test_01_send_message
1919
assert(future.isDone(), 'expected message to be done')
2020
assert(future.get().topic(), topic)
2121
assert_equal(future.get().partition(), 0)
22-
2322
end
2423

2524
def test_02_send_msg_with_cb
@@ -63,4 +62,38 @@ def test_03_get_sent_msg
6362
end
6463
assert(found.include?('test message'), 'expected to find message: test message')
6564
end
65+
66+
def test_04_send_message_with_ts
67+
topic = 'test_send'
68+
future = send_kafka_producer_msg_ts topic, (Time.now.to_i * 1000)
69+
assert_not_nil(future)
70+
begin
71+
timeout(30) do
72+
until future.isDone() do
73+
next
74+
end
75+
end
76+
end
77+
assert(future.isDone(), 'expected message to be done')
78+
assert(future.get().topic(), topic)
79+
assert_equal(future.get().partition(), 0)
80+
end
81+
82+
def test_05_send_msg_with_ts_and_cb
83+
metadata = exception = nil
84+
future = send_kafka_producer_msg_ts_cb(Time.now.to_i * 1000) { |md,e| metadata = md; exception = e }
85+
assert_not_nil(future)
86+
begin
87+
timeout(30) do
88+
while metadata.nil? && exception.nil? do
89+
next
90+
end
91+
end
92+
end
93+
assert_not_nil(metadata)
94+
assert_instance_of(Java::OrgApacheKafkaClientsProducer::RecordMetadata, metadata)
95+
assert_nil(exception)
96+
assert(future.isDone(), 'expected message to be done')
97+
end
98+
6699
end

test/util/kafka-producer.rb

+10
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,13 @@ def send_kafka_producer_msg_cb(&block)
1515
producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS)
1616
producer.send_msg('test',nil, nil, 'test message', &block)
1717
end
18+
19+
def send_kafka_producer_msg_ts(topic, timestamp)
20+
producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS)
21+
producer.send_msg(topic,nil, nil, 'test message', timestamp)
22+
end
23+
24+
def send_kafka_producer_msg_ts_cb(timestamp, &block)
25+
producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS)
26+
producer.send_msg('test',nil, nil, 'test message', timestamp, &block)
27+
end

0 commit comments

Comments
 (0)