Skip to content

Commit a441ef3

Browse files
committed
Validate crcs in fetcher
1 parent a0b96ec commit a441ef3

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

kafka/consumer/fetcher.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,11 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
856856
while batch is not None:
857857
last_batch = batch
858858

859+
if self.check_crcs and not batch.validate_crc():
860+
raise Errors.CorruptRecordException(
861+
"Record batch for partition %s at offset %s failed crc check" % (
862+
self.topic_partition, batch.base_offset))
863+
859864
# Try DefaultsRecordBatch / message log format v2
860865
# base_offset, last_offset_delta, and control batches
861866
if batch.magic == 2:
@@ -872,6 +877,10 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
872877
continue
873878

874879
for record in batch:
880+
if self.check_crcs and not record.validate_crc():
881+
raise Errors.CorruptRecordException(
882+
"Record for partition %s at offset %s failed crc check" % (
883+
self.topic_partition, record.offset))
875884
key_size = len(record.key) if record.key is not None else -1
876885
value_size = len(record.value) if record.value is not None else -1
877886
key = self._deserialize(key_deserializer, tp.topic, record.key)

0 commit comments

Comments
 (0)