Skip to content

Commit efcad6f

Browse files
authored
Move data reading to a new task (#104)
1 parent 20f23c3 commit efcad6f

File tree

1 file changed

+76
-35
lines changed

1 file changed

+76
-35
lines changed

crates/hdfs-native/src/hdfs/block_reader.rs

+76-35
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ use futures::{
77
Stream, StreamExt,
88
};
99
use log::{debug, warn};
10+
use tokio::{
11+
sync::mpsc::{self, Receiver, Sender},
12+
task::JoinHandle,
13+
};
1014

1115
use crate::{
1216
ec::EcSchema,
1317
hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE},
1418
proto::{
1519
common,
16-
hdfs::{self, BlockOpResponseProto},
20+
hdfs::{self, BlockOpResponseProto, PacketHeaderProto, ReadOpChecksumInfoProto},
1721
},
1822
HdfsError, Result,
1923
};
@@ -99,8 +103,9 @@ struct ReplicatedBlockStream {
99103
offset: usize,
100104
len: usize,
101105

102-
connection: Option<DatanodeConnection>,
103-
checksum_info: Option<hdfs::ReadOpChecksumInfoProto>,
106+
listener: Option<JoinHandle<Result<DatanodeConnection>>>,
107+
sender: Sender<Result<(PacketHeaderProto, Bytes)>>,
108+
receiver: Receiver<Result<(PacketHeaderProto, Bytes)>>,
104109
current_replica: usize,
105110
}
106111

@@ -111,29 +116,33 @@ impl ReplicatedBlockStream {
111116
offset: usize,
112117
len: usize,
113118
) -> Self {
119+
let (sender, receiver) = mpsc::channel(10);
120+
114121
Self {
115122
protocol,
116123
block,
117124
offset,
118125
len,
119-
connection: None,
120-
checksum_info: None,
126+
listener: None,
127+
sender,
128+
receiver,
121129
current_replica: 0,
122130
}
123131
}
124132

125-
async fn select_next_datanode(&mut self) -> Result<()> {
126-
if self.connection.is_some() {
127-
self.current_replica += 1;
128-
if self.current_replica >= self.block.locs.len() {
129-
return Err(HdfsError::DataTransferError(
130-
"All DataNodes failed".to_string(),
131-
));
132-
}
133+
async fn select_next_datanode(
134+
&mut self,
135+
) -> Result<(DatanodeConnection, Option<ReadOpChecksumInfoProto>)> {
136+
if self.current_replica >= self.block.locs.len() {
137+
return Err(HdfsError::DataTransferError(
138+
"All DataNodes failed".to_string(),
139+
));
133140
}
134141

135142
let datanode = &self.block.locs[self.current_replica].id;
136143

144+
self.current_replica += 1;
145+
137146
let (connection, response) = connect_and_send(
138147
&self.protocol,
139148
datanode,
@@ -148,48 +157,80 @@ impl ReplicatedBlockStream {
148157
return Err(HdfsError::DataTransferError(response.message().to_string()));
149158
}
150159

151-
self.connection = Some(connection);
152-
self.checksum_info = response.read_op_checksum_info;
153-
154-
Ok(())
160+
Ok((connection, response.read_op_checksum_info))
155161
}
156162

157163
async fn next_packet(&mut self) -> Result<Option<Bytes>> {
158-
if self.connection.is_none() {
159-
self.select_next_datanode().await?;
160-
}
161-
162-
if self.len == 0 {
163-
let mut conn = self.connection.take().unwrap();
164+
let (header, data) = loop {
165+
if self.listener.is_none() {
166+
let (connection, checksum_info) = self.select_next_datanode().await?;
167+
self.listener = Some(Self::start_packet_listener(
168+
connection,
169+
checksum_info,
170+
self.sender.clone(),
171+
));
172+
}
164173

165-
// Read the final empty packet
166-
conn.read_packet().await?;
174+
match self.receiver.recv().await {
175+
Some(Ok(data)) => break data,
176+
Some(Err(e)) => {
177+
// Some error communicating with datanode, log a warning and then retry on a different Datanode
178+
warn!("Error occured while reading from DataNode: {:?}", e);
179+
self.listener = None;
180+
}
181+
None => {
182+
// This means there's a disconnect between the data we are getting back and what we asked for,
183+
// so just raise an error
184+
return Err(HdfsError::DataTransferError(
185+
"Not enough data returned from DataNode".to_string(),
186+
));
187+
}
188+
}
189+
};
167190

168-
conn.send_read_success().await?;
191+
if self.len == 0 {
192+
let conn = self.listener.take().unwrap().await.unwrap()?;
169193
DATANODE_CACHE.release(conn);
170194
return Ok(None);
171195
}
172196

173-
let conn = self.connection.as_mut().unwrap();
174-
175-
let packet = conn.read_packet().await?;
176-
177-
let packet_offset = if self.offset > packet.header.offset_in_block as usize {
178-
self.offset - packet.header.offset_in_block as usize
197+
let packet_offset = if self.offset > header.offset_in_block as usize {
198+
self.offset - header.offset_in_block as usize
179199
} else {
180200
0
181201
};
182-
let packet_len = usize::min(packet.header.data_len as usize - packet_offset, self.len);
183-
let packet_data = packet.get_data(&self.checksum_info)?;
202+
let packet_len = usize::min(header.data_len as usize - packet_offset, self.len);
184203

185204
self.offset += packet_len;
186205
self.len -= packet_len;
187206

188207
Ok(Some(
189-
packet_data.slice(packet_offset..(packet_offset + packet_len)),
208+
data.slice(packet_offset..(packet_offset + packet_len)),
190209
))
191210
}
192211

212+
fn start_packet_listener(
213+
mut connection: DatanodeConnection,
214+
checksum_info: Option<ReadOpChecksumInfoProto>,
215+
sender: Sender<Result<(PacketHeaderProto, Bytes)>>,
216+
) -> JoinHandle<Result<DatanodeConnection>> {
217+
tokio::spawn(async move {
218+
loop {
219+
let packet = connection.read_packet().await?;
220+
let header = packet.header.clone();
221+
let data = packet.get_data(&checksum_info)?;
222+
let empty_packet = data.is_empty();
223+
sender.send(Ok((header, data))).await.unwrap();
224+
225+
if empty_packet {
226+
connection.send_read_success().await?;
227+
break;
228+
}
229+
}
230+
Ok(connection)
231+
})
232+
}
233+
193234
fn into_stream(self) -> impl Stream<Item = Result<Bytes>> {
194235
stream::unfold(self, |mut state| async move {
195236
let next = state.next_packet().await.transpose();

0 commit comments

Comments
 (0)