Skip to content

Commit 7a0c28a

Browse files
authored
Change replicated reader to eagerly cleanup the DataNode connection (#111)
1 parent 81bddc4 commit 7a0c28a

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

rust/src/hdfs/block_reader.rs

+16-9
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ impl ReplicatedBlockStream {
169169
}
170170

171171
async fn next_packet(&mut self) -> Result<Option<Bytes>> {
172+
// We've finished this read, just return None
173+
if self.len == 0 {
174+
return Ok(None);
175+
}
176+
172177
let (header, data) = loop {
173178
if self.listener.is_none() {
174179
let (connection, checksum_info) = self.select_next_datanode().await?;
@@ -196,12 +201,6 @@ impl ReplicatedBlockStream {
196201
}
197202
};
198203

199-
if self.len == 0 {
200-
let conn = self.listener.take().unwrap().await.unwrap()?;
201-
DATANODE_CACHE.release(conn);
202-
return Ok(None);
203-
}
204-
205204
let packet_offset = if self.offset > header.offset_in_block as usize {
206205
self.offset - header.offset_in_block as usize
207206
} else {
@@ -212,6 +211,12 @@ impl ReplicatedBlockStream {
212211
self.offset += packet_len;
213212
self.len -= packet_len;
214213

214+
// We've consumed the whole read, there should be no more packets and the listener should complete
215+
if self.len == 0 {
216+
let conn = self.listener.take().unwrap().await.unwrap()?;
217+
DATANODE_CACHE.release(conn);
218+
}
219+
215220
Ok(Some(
216221
data.slice(packet_offset..(packet_offset + packet_len)),
217222
))
@@ -227,13 +232,15 @@ impl ReplicatedBlockStream {
227232
let packet = connection.read_packet().await?;
228233
let header = packet.header.clone();
229234
let data = packet.get_data(&checksum_info)?;
230-
let empty_packet = data.is_empty();
231-
sender.send(Ok((header, data))).await.unwrap();
232235

233-
if empty_packet {
236+
// If the packet is empty it means it's the last packet
237+
// so tell the DataNode the read was a success and finish this task
238+
if data.is_empty() {
234239
connection.send_read_success().await?;
235240
break;
236241
}
242+
243+
sender.send(Ok((header, data))).await.unwrap();
237244
}
238245
Ok(connection)
239246
})

0 commit comments

Comments
 (0)