@@ -222,25 +222,36 @@ impl ReplicatedBlockStream {
222
222
) )
223
223
}
224
224
225
+ async fn get_next_packet (
226
+ connection : & mut DatanodeConnection ,
227
+ checksum_info : Option < ReadOpChecksumInfoProto > ,
228
+ ) -> Result < ( PacketHeaderProto , Bytes ) > {
229
+ let packet = connection. read_packet ( ) . await ?;
230
+ let header = packet. header ;
231
+ Ok ( ( header, packet. get_data ( & checksum_info) ?) )
232
+ }
233
+
225
234
fn start_packet_listener (
226
235
mut connection : DatanodeConnection ,
227
236
checksum_info : Option < ReadOpChecksumInfoProto > ,
228
237
sender : Sender < Result < ( PacketHeaderProto , Bytes ) > > ,
229
238
) -> JoinHandle < Result < DatanodeConnection > > {
230
239
tokio:: spawn ( async move {
231
240
loop {
232
- let packet = connection. read_packet ( ) . await ?;
233
- let header = packet. header ;
234
- let data = packet. get_data ( & checksum_info) ?;
235
-
236
- // If the packet is empty it means it's the last packet
237
- // so tell the DataNode the read was a success and finish this task
238
- if data. is_empty ( ) {
241
+ let next_packet = Self :: get_next_packet ( & mut connection, checksum_info) . await ;
242
+ if next_packet. as_ref ( ) . is_ok_and ( |( _, data) | data. is_empty ( ) ) {
243
+ // If the packet is empty it means it's the last packet
244
+ // so tell the DataNode the read was a success and finish this task
239
245
connection. send_read_success ( ) . await ?;
240
246
break ;
241
247
}
242
248
243
- sender. send ( Ok ( ( header, data) ) ) . await . unwrap ( ) ;
249
+ if sender. send ( next_packet) . await . is_err ( ) {
250
+ // The block reader was dropped, so just kill the listener
251
+ return Err ( HdfsError :: DataTransferError (
252
+ "Reader was dropped without consuming all data" . to_string ( ) ,
253
+ ) ) ;
254
+ }
244
255
}
245
256
Ok ( connection)
246
257
} )
0 commit comments