Skip to content

Commit 14643ca

Browse files
committed
simplify consumer.poll send fetches logic
1 parent a441ef3 commit 14643ca

File tree

1 file changed

+9
-13
lines changed

1 file changed

+9
-13
lines changed

kafka/consumer/group.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -707,22 +707,18 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
707707
# If data is available already, e.g. from a previous network client
708708
# poll() call to commit, then just return it immediately
709709
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
710+
# Before returning the fetched records, we can send off the
711+
# next round of fetches and avoid block waiting for their
712+
# responses to enable pipelining while the user is handling the
713+
# fetched records.
714+
if not partial:
715+
futures = self._fetcher.send_fetches()
716+
if len(futures):
717+
self._client.poll(timeout_ms=0)
718+
710719
if records:
711-
# Before returning the fetched records, we can send off the
712-
# next round of fetches and avoid block waiting for their
713-
# responses to enable pipelining while the user is handling the
714-
# fetched records.
715-
if not partial:
716-
futures = self._fetcher.send_fetches()
717-
if len(futures):
718-
self._client.poll(timeout_ms=0)
719720
return records
720721

721-
# Send any new fetches (won't resend pending fetches)
722-
futures = self._fetcher.send_fetches()
723-
if len(futures):
724-
self._client.poll(timeout_ms=0)
725-
726722
self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
727723
# after the long poll, we should check whether the group needs to rebalance
728724
# prior to returning data so that the group can stabilize faster

0 commit comments

Comments
 (0)