Skip to content

Commit 34c1c6b

Browse files
committed
Add error handling and retries for using cached connections
1 parent d1a7808 commit 34c1c6b

File tree

1 file changed

+71
-36
lines changed

1 file changed

+71
-36
lines changed

crates/hdfs-native/src/hdfs/block_reader.rs

+71-36
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ use futures::{
66
stream::{self, BoxStream},
77
Stream, StreamExt,
88
};
9-
use log::debug;
9+
use log::{debug, warn};
1010

1111
use crate::{
1212
ec::EcSchema,
1313
hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE},
14-
proto::{common, hdfs},
14+
proto::{
15+
common,
16+
hdfs::{self, BlockOpResponseProto},
17+
},
1518
HdfsError, Result,
1619
};
1720

@@ -32,6 +35,55 @@ pub(crate) fn get_block_stream(
3235
}
3336
}
3437

38+
/// Connects to a DataNode to do a read, attempting to used cached connections.
39+
async fn connect_and_send(
40+
url: &str,
41+
block: &hdfs::ExtendedBlockProto,
42+
token: common::TokenProto,
43+
offset: u64,
44+
len: u64,
45+
) -> Result<(DatanodeConnection, BlockOpResponseProto)> {
46+
let mut remaining_attempts = 2;
47+
while remaining_attempts > 0 {
48+
if let Some(mut conn) = DATANODE_CACHE.get(url) {
49+
let message = hdfs::OpReadBlockProto {
50+
header: conn.build_header(block, Some(token.clone())),
51+
offset,
52+
len,
53+
send_checksums: Some(true),
54+
..Default::default()
55+
};
56+
debug!("Block read op request {:?}", &message);
57+
match conn.send(Op::ReadBlock, &message).await {
58+
Ok(response) => {
59+
debug!("Block read op response {:?}", response);
60+
return Ok((conn, response));
61+
}
62+
Err(e) => {
63+
warn!("Failed to use cached connection: {:?}", e);
64+
}
65+
}
66+
} else {
67+
break;
68+
}
69+
remaining_attempts -= 1;
70+
}
71+
let mut conn = DatanodeConnection::connect(url).await?;
72+
73+
let message = hdfs::OpReadBlockProto {
74+
header: conn.build_header(block, Some(token)),
75+
offset,
76+
len,
77+
send_checksums: Some(true),
78+
..Default::default()
79+
};
80+
81+
debug!("Block read op request {:?}", &message);
82+
let response = conn.send(Op::ReadBlock, &message).await?;
83+
debug!("Block read op response {:?}", response);
84+
Ok((conn, response))
85+
}
86+
3587
struct ReplicatedBlockStream {
3688
block: hdfs::LocatedBlockProto,
3789
offset: usize,
@@ -63,26 +115,18 @@ impl ReplicatedBlockStream {
63115
));
64116
}
65117
}
66-
let datanode = &self.block.locs[self.current_replica].id;
67118

119+
let datanode = &self.block.locs[self.current_replica].id;
68120
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
69-
let mut connection = if let Some(conn) = DATANODE_CACHE.get(&datanode_url) {
70-
conn
71-
} else {
72-
DatanodeConnection::connect(&datanode_url).await?
73-
};
74121

75-
let message = hdfs::OpReadBlockProto {
76-
header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())),
77-
offset: self.offset as u64,
78-
len: self.len as u64,
79-
send_checksums: Some(true),
80-
..Default::default()
81-
};
82-
83-
debug!("Block read op request {:?}", &message);
84-
let response = connection.send(Op::ReadBlock, &message).await?;
85-
debug!("Block read op response {:?}", response);
122+
let (connection, response) = connect_and_send(
123+
&datanode_url,
124+
&self.block.b,
125+
self.block.block_token.clone(),
126+
self.offset as u64,
127+
self.len as u64,
128+
)
129+
.await?;
86130

87131
if response.status() != hdfs::Status::Success {
88132
return Err(HdfsError::DataTransferError(response.message().to_string()));
@@ -346,23 +390,14 @@ impl StripedBlockStream {
346390
}
347391

348392
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
349-
let mut connection = if let Some(conn) = DATANODE_CACHE.get(&datanode_url) {
350-
conn
351-
} else {
352-
DatanodeConnection::connect(&datanode_url).await?
353-
};
354-
355-
let message = hdfs::OpReadBlockProto {
356-
header: connection.build_header(block, Some(token.clone())),
357-
offset: offset as u64,
358-
len: len as u64,
359-
send_checksums: Some(true),
360-
..Default::default()
361-
};
362-
363-
debug!("Block read op request {:?}", &message);
364-
let response = connection.send(Op::ReadBlock, &message).await?;
365-
debug!("Block read op response {:?}", response);
393+
let (mut connection, response) = connect_and_send(
394+
&datanode_url,
395+
block,
396+
token.clone(),
397+
self.offset as u64,
398+
self.len as u64,
399+
)
400+
.await?;
366401

367402
if response.status() != hdfs::Status::Success {
368403
return Err(HdfsError::DataTransferError(response.message().to_string()));

0 commit comments

Comments
 (0)