Skip to content

Commit 7f74623

Browse files
committed
Tweak offset to voice the last messages only
1 parent f6b0040 commit 7f74623

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

porte.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,13 @@ def create_consumer(kafka_servers, topic, group_id, username, password):
5050
consumer = Consumer(conf)
5151

5252
# Subscribe to the topic
53-
consumer.subscribe(topic)
54-
53+
def my_assign (consumer, partitions):
54+
for p in partitions:
55+
p.offset = OFFSET_END
56+
print('assign', partitions)
57+
consumer.assign(partitions)
58+
# Subscribe to the topic
59+
consumer.subscribe(topic, on_assign=my_assign)
5560
return consumer
5661

5762
def consume_messages(consumer, building, welcome, goodbye):

0 commit comments

Comments
 (0)