Skip to content

Commit a0b96ec

Browse files
committed
Lazy _unpack_records in PartitionRecords
1 parent 36f7a0c commit a0b96ec

File tree

2 files changed

+163
-141
lines changed

2 files changed

+163
-141
lines changed

kafka/consumer/fetcher.py

Lines changed: 117 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import collections
44
import copy
5+
import itertools
56
import logging
67
import random
78
import sys
@@ -378,88 +379,35 @@ def _append(self, drained, part, max_records, update_offsets):
378379
# note that the position should always be available
379380
# as long as the partition is still assigned
380381
position = self._subscriptions.assignment[tp].position
381-
if part.fetch_offset == position.offset:
382+
if part.next_fetch_offset == position.offset:
382383
part_records = part.take(max_records)
383-
next_offset = part_records[-1].offset + 1
384-
leader_epoch = part_records[-1].leader_epoch
385-
386384
log.debug("Returning fetched records at offset %d for assigned"
387385
" partition %s", position.offset, tp)
388386
drained[tp].extend(part_records)
389-
if update_offsets:
387+
# We want to increment subscription position if (1) we're using consumer.poll(),
388+
# or (2) we didn't return any records (consumer iterator will update position
389+
# when each message is yielded). There may be edge cases where we re-fetch records
390+
# that we'll end up skipping, but for now we'll live with that.
391+
highwater = self._subscriptions.assignment[tp].highwater
392+
if highwater is not None:
393+
self._sensors.records_fetch_lag.record(highwater - part.next_fetch_offset)
394+
if update_offsets or not part_records:
390395
# TODO: save leader_epoch
391396
log.debug("Updating fetch position for assigned partition %s to %s (leader epoch %s)",
392-
tp, next_offset, leader_epoch)
393-
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1)
397+
tp, part.next_fetch_offset, part.leader_epoch)
398+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(part.next_fetch_offset, '', -1)
394399
return len(part_records)
395400

396401
else:
397402
# these records aren't next in line based on the last consumed
398403
# position, ignore them they must be from an obsolete request
399404
log.debug("Ignoring fetched records for %s at offset %s since"
400-
" the current position is %d", tp, part.fetch_offset,
405+
" the current position is %d", tp, part.next_fetch_offset,
401406
position.offset)
402407

403-
part.discard()
408+
part.drain()
404409
return 0
405410

406-
def _unpack_records(self, tp, records):
407-
try:
408-
batch = records.next_batch()
409-
while batch is not None:
410-
411-
# Try DefaultsRecordBatch / message log format v2
412-
# base_offset, last_offset_delta, and control batches
413-
try:
414-
batch_offset = batch.base_offset + batch.last_offset_delta
415-
leader_epoch = batch.leader_epoch
416-
# Control batches have a single record indicating whether a transaction
417-
# was aborted or committed.
418-
# When isolation_level is READ_COMMITTED (currently unsupported)
419-
# we should also skip all messages from aborted transactions
420-
# For now we only support READ_UNCOMMITTED and so we ignore the
421-
# abort/commit signal.
422-
if batch.is_control_batch:
423-
batch = records.next_batch()
424-
continue
425-
except AttributeError:
426-
leader_epoch = -1
427-
pass
428-
429-
for record in batch:
430-
key_size = len(record.key) if record.key is not None else -1
431-
value_size = len(record.value) if record.value is not None else -1
432-
key = self._deserialize(
433-
self.config['key_deserializer'],
434-
tp.topic, record.key)
435-
value = self._deserialize(
436-
self.config['value_deserializer'],
437-
tp.topic, record.value)
438-
headers = record.headers
439-
header_size = sum(
440-
len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in
441-
headers) if headers else -1
442-
yield ConsumerRecord(
443-
tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp,
444-
record.timestamp_type, key, value, headers, record.checksum,
445-
key_size, value_size, header_size)
446-
447-
batch = records.next_batch()
448-
449-
# If unpacking raises StopIteration, it is erroneously
450-
# caught by the generator. We want all exceptions to be raised
451-
# back to the user. See Issue 545
452-
except StopIteration:
453-
log.exception('StopIteration raised unpacking messageset')
454-
raise RuntimeError('StopIteration raised unpacking messageset')
455-
456-
def _deserialize(self, f, topic, bytes_):
457-
if not f:
458-
return bytes_
459-
if isinstance(f, Deserializer):
460-
return f.deserialize(topic, bytes_)
461-
return f(bytes_)
462-
463411
def _send_list_offsets_requests(self, timestamps):
464412
"""Fetch offsets for each partition in timestamps dict. This may send
465413
request to multiple nodes, based on who is Leader for partition.
@@ -773,12 +721,9 @@ def _handle_fetch_error(self, node_id, exception):
773721
def _parse_fetched_data(self, completed_fetch):
774722
tp = completed_fetch.topic_partition
775723
fetch_offset = completed_fetch.fetched_offset
776-
num_bytes = 0
777-
records_count = 0
778-
parsed_records = None
779-
780724
error_code, highwater = completed_fetch.partition_data[:2]
781725
error_type = Errors.for_code(error_code)
726+
parsed_records = None
782727

783728
try:
784729
if not self._subscriptions.is_fetchable(tp):
@@ -807,13 +752,12 @@ def _parse_fetched_data(self, completed_fetch):
807752
log.debug("Adding fetched record for partition %s with"
808753
" offset %d to buffered record list", tp,
809754
position.offset)
810-
unpacked = list(self._unpack_records(tp, records))
811-
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
812-
if unpacked:
813-
last_offset = unpacked[-1].offset
814-
self._sensors.records_fetch_lag.record(highwater - last_offset)
815-
num_bytes = records.valid_bytes()
816-
records_count = len(unpacked)
755+
parsed_records = self.PartitionRecords(fetch_offset, tp, records,
756+
self.config['key_deserializer'],
757+
self.config['value_deserializer'],
758+
self.config['check_crcs'],
759+
completed_fetch.metric_aggregator)
760+
return parsed_records
817761
elif records.size_in_bytes() > 0:
818762
# we did not read a single message from a non-empty
819763
# buffer because that message's size is larger than
@@ -858,52 +802,116 @@ def _parse_fetched_data(self, completed_fetch):
858802
raise error_type('Unexpected error while fetching data')
859803

860804
finally:
861-
completed_fetch.metric_aggregator.record(tp, num_bytes, records_count)
805+
if parsed_records is None:
806+
completed_fetch.metric_aggregator.record(tp, 0, 0)
862807

863-
return parsed_records
808+
return None
809+
810+
def close(self):
811+
if self._next_partition_records is not None:
812+
self._next_partition_records.drain()
864813

865814
class PartitionRecords(object):
866-
def __init__(self, fetch_offset, tp, messages):
815+
def __init__(self, fetch_offset, tp, records, key_deserializer, value_deserializer, check_crcs, metric_aggregator):
867816
self.fetch_offset = fetch_offset
868817
self.topic_partition = tp
869-
self.messages = messages
818+
self.leader_epoch = -1
819+
self.next_fetch_offset = fetch_offset
820+
self.bytes_read = 0
821+
self.records_read = 0
822+
self.metric_aggregator = metric_aggregator
823+
self.check_crcs = check_crcs
824+
self.record_iterator = itertools.dropwhile(
825+
self._maybe_skip_record,
826+
self._unpack_records(tp, records, key_deserializer, value_deserializer))
827+
828+
def _maybe_skip_record(self, record):
870829
# When fetching an offset that is in the middle of a
871830
# compressed batch, we will get all messages in the batch.
872831
# But we want to start 'take' at the fetch_offset
873832
# (or the next highest offset in case the message was compacted)
874-
for i, msg in enumerate(messages):
875-
if msg.offset < fetch_offset:
876-
log.debug("Skipping message offset: %s (expecting %s)",
877-
msg.offset, fetch_offset)
878-
else:
879-
self.message_idx = i
880-
break
881-
833+
if record.offset < self.fetch_offset:
834+
log.debug("Skipping message offset: %s (expecting %s)",
835+
record.offset, self.fetch_offset)
836+
return True
882837
else:
883-
self.message_idx = 0
884-
self.messages = None
838+
return False
885839

886-
# For truthiness evaluation we need to define __len__ or __nonzero__
887-
def __len__(self):
888-
if self.messages is None or self.message_idx >= len(self.messages):
889-
return 0
890-
return len(self.messages) - self.message_idx
840+
# For truthiness evaluation
841+
def __bool__(self):
842+
return self.record_iterator is not None
891843

892-
def discard(self):
893-
self.messages = None
844+
def drain(self):
845+
if self.record_iterator is not None:
846+
self.record_iterator = None
847+
self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read)
894848

895849
def take(self, n=None):
896-
if not len(self):
897-
return []
898-
if n is None or n > len(self):
899-
n = len(self)
900-
next_idx = self.message_idx + n
901-
res = self.messages[self.message_idx:next_idx]
902-
self.message_idx = next_idx
903-
# fetch_offset should be incremented by 1 to parallel the
904-
# subscription position (also incremented by 1)
905-
self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1)
906-
return res
850+
return list(itertools.islice(self.record_iterator, 0, n))
851+
852+
def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
853+
try:
854+
batch = records.next_batch()
855+
last_batch = None
856+
while batch is not None:
857+
last_batch = batch
858+
859+
# Try DefaultsRecordBatch / message log format v2
860+
# base_offset, last_offset_delta, and control batches
861+
if batch.magic == 2:
862+
self.leader_epoch = batch.leader_epoch
863+
# Control batches have a single record indicating whether a transaction
864+
# was aborted or committed.
865+
# When isolation_level is READ_COMMITTED (currently unsupported)
866+
# we should also skip all messages from aborted transactions
867+
# For now we only support READ_UNCOMMITTED and so we ignore the
868+
# abort/commit signal.
869+
if batch.is_control_batch:
870+
self.next_fetch_offset = next(batch).offset + 1
871+
batch = records.next_batch()
872+
continue
873+
874+
for record in batch:
875+
key_size = len(record.key) if record.key is not None else -1
876+
value_size = len(record.value) if record.value is not None else -1
877+
key = self._deserialize(key_deserializer, tp.topic, record.key)
878+
value = self._deserialize(value_deserializer, tp.topic, record.value)
879+
headers = record.headers
880+
header_size = sum(
881+
len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in
882+
headers) if headers else -1
883+
self.records_read += 1
884+
self.bytes_read += record.size_in_bytes
885+
self.next_fetch_offset = record.offset + 1
886+
yield ConsumerRecord(
887+
tp.topic, tp.partition, self.leader_epoch, record.offset, record.timestamp,
888+
record.timestamp_type, key, value, headers, record.checksum,
889+
key_size, value_size, header_size)
890+
891+
batch = records.next_batch()
892+
else:
893+
# Message format v2 preserves the last offset in a batch even if the last record is removed
894+
# through compaction. By using the next offset computed from the last offset in the batch,
895+
# we ensure that the offset of the next fetch will point to the next batch, which avoids
896+
# unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
897+
# fetching the same batch repeatedly).
898+
if last_batch and last_batch.magic == 2:
899+
self.next_fetch_offset = last_batch.base_offset + last_batch.last_offset_delta + 1
900+
self.drain()
901+
902+
# If unpacking raises StopIteration, it is erroneously
903+
# caught by the generator. We want all exceptions to be raised
904+
# back to the user. See Issue 545
905+
except StopIteration:
906+
log.exception('StopIteration raised unpacking messageset')
907+
raise RuntimeError('StopIteration raised unpacking messageset')
908+
909+
def _deserialize(self, f, topic, bytes_):
910+
if not f:
911+
return bytes_
912+
if isinstance(f, Deserializer):
913+
return f.deserialize(topic, bytes_)
914+
return f(bytes_)
907915

908916

909917
class FetchSessionHandler(object):

0 commit comments

Comments
 (0)