@@ -1118,6 +1118,9 @@ def _manage_batch_flow(self, batch: "_Batch") -> None:
1118
1118
self ._send_batch_to_step (new_batch )
1119
1119
else :
1120
1120
self ._request_more_batches_if_needed (step )
1121
+ else :
1122
+ if len (self .dag ) == 1 :
1123
+ self ._request_batch_from_generator (step .name ) # type: ignore
1121
1124
1122
1125
self ._cache ()
1123
1126
@@ -1225,6 +1228,19 @@ def _request_initial_batches(self) -> None:
1225
1228
)
1226
1229
self ._send_batch_to_step (batch )
1227
1230
1231
+ def _request_batch_from_generator (self , step_name : str ) -> None :
1232
+ """Request a new batch to a `GeneratorStep`.
1233
+
1234
+ Args:
1235
+ step_name: the name of the `GeneratorStep` to which a batch has to be requested.
1236
+ """
1237
+ # Get the last batch that the previous step sent to generate the next batch
1238
+ # (next `seq_no`).
1239
+ last_batch = self ._batch_manager .get_last_batch_sent (step_name ) # type: ignore
1240
+ if last_batch is None :
1241
+ return
1242
+ self ._send_batch_to_step (last_batch .next_batch ())
1243
+
1228
1244
def _request_more_batches_if_needed (self , step : "Step" ) -> None :
1229
1245
"""Request more batches to the predecessors steps of `step` if needed.
1230
1246
@@ -1239,17 +1255,7 @@ def _request_more_batches_if_needed(self, step: "Step") -> None:
1239
1255
if previous_step_name not in self .dag .root_steps :
1240
1256
continue
1241
1257
1242
- # Get the last batch that the previous step sent to generate the next batch
1243
- # (next `seq_no`).
1244
- last_batch = self ._batch_manager .get_last_batch_sent (previous_step_name ) # type: ignore
1245
- if last_batch is None :
1246
- continue
1247
-
1248
- self ._logger .debug (
1249
- f"Step '{ step .name } ' input buffer for step '{ previous_step_name } ' is"
1250
- " empty. Requesting new batch..."
1251
- )
1252
- self ._send_batch_to_step (last_batch .next_batch ())
1258
+ self ._request_batch_from_generator (previous_step_name )
1253
1259
1254
1260
def _handle_batch_on_stop (self , batch : "_Batch" ) -> None :
1255
1261
"""Handles a batch that was received from the output queue when the pipeline was
0 commit comments