Skip to content

Commit cf2c49e

Browse files
authored
Support DataNode connection reuse (#80)
1 parent 4b16e3a commit cf2c49e

File tree

8 files changed

+183
-72
lines changed

8 files changed

+183
-72
lines changed

Cargo.lock

+4-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,9 @@ resolver = "2"
77

88
[workspace.dependencies]
99
bytes = "1"
10+
chrono = "0.4"
1011
futures = "0.3"
11-
tokio = "1"
12+
tokio = "1"
13+
14+
[profile.bench]
15+
debug = true

crates/hdfs-native-object-store/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ license = "Apache-2.0"
1313
[dependencies]
1414
async-trait = { version = "0.1" }
1515
bytes = { workspace = true }
16-
chrono = { version = "0.4" }
16+
chrono = { workspace = true }
1717
futures = { workspace = true }
1818
hdfs-native = { path = "../hdfs-native", version = "0.7" }
1919
object_store = { version = "0.9", features = ["cloud"] }

crates/hdfs-native/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@ license = "Apache-2.0"
1313
[dependencies]
1414
base64 = "0.21"
1515
bytes = { workspace = true }
16+
chrono = { workspace = true }
1617
crc = "3.1.0-beta.1"
1718
futures = { workspace = true }
1819
g2p = "1"
1920
gsasl-sys = { version = "0.2", default-features = false, optional = true }
2021
libc = "0.2"
21-
libgssapi = { version = "0.6", default-features = false, optional = true }
22+
libgssapi = { version = "0.7", default-features = false, optional = true }
2223
log = "0.4"
2324
num-traits = "0.2"
25+
once_cell = "1.19.0"
2426
prost = "0.12"
2527
prost-types = "0.12"
2628
roxmltree = "0.18"

crates/hdfs-native/src/hdfs/block_reader.rs

+87-39
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ use futures::{
66
stream::{self, BoxStream},
77
Stream, StreamExt,
88
};
9-
use log::debug;
9+
use log::{debug, warn};
1010

1111
use crate::{
1212
ec::EcSchema,
13-
hdfs::connection::{DatanodeConnection, Op},
14-
proto::{common, hdfs},
13+
hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE},
14+
proto::{
15+
common,
16+
hdfs::{self, BlockOpResponseProto},
17+
},
1518
HdfsError, Result,
1619
};
1720

@@ -32,6 +35,55 @@ pub(crate) fn get_block_stream(
3235
}
3336
}
3437

38+
/// Connects to a DataNode to do a read, attempting to used cached connections.
39+
async fn connect_and_send(
40+
url: &str,
41+
block: &hdfs::ExtendedBlockProto,
42+
token: common::TokenProto,
43+
offset: u64,
44+
len: u64,
45+
) -> Result<(DatanodeConnection, BlockOpResponseProto)> {
46+
let mut remaining_attempts = 2;
47+
while remaining_attempts > 0 {
48+
if let Some(mut conn) = DATANODE_CACHE.get(url) {
49+
let message = hdfs::OpReadBlockProto {
50+
header: conn.build_header(block, Some(token.clone())),
51+
offset,
52+
len,
53+
send_checksums: Some(true),
54+
..Default::default()
55+
};
56+
debug!("Block read op request {:?}", &message);
57+
match conn.send(Op::ReadBlock, &message).await {
58+
Ok(response) => {
59+
debug!("Block read op response {:?}", response);
60+
return Ok((conn, response));
61+
}
62+
Err(e) => {
63+
warn!("Failed to use cached connection: {:?}", e);
64+
}
65+
}
66+
} else {
67+
break;
68+
}
69+
remaining_attempts -= 1;
70+
}
71+
let mut conn = DatanodeConnection::connect(url).await?;
72+
73+
let message = hdfs::OpReadBlockProto {
74+
header: conn.build_header(block, Some(token)),
75+
offset,
76+
len,
77+
send_checksums: Some(true),
78+
..Default::default()
79+
};
80+
81+
debug!("Block read op request {:?}", &message);
82+
let response = conn.send(Op::ReadBlock, &message).await?;
83+
debug!("Block read op response {:?}", response);
84+
Ok((conn, response))
85+
}
86+
3587
struct ReplicatedBlockStream {
3688
block: hdfs::LocatedBlockProto,
3789
offset: usize,
@@ -63,24 +115,18 @@ impl ReplicatedBlockStream {
63115
));
64116
}
65117
}
66-
let datanode = &self.block.locs[self.current_replica].id;
67-
let mut connection =
68-
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
69-
.await?;
70-
71-
let message = hdfs::OpReadBlockProto {
72-
header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())),
73-
offset: self.offset as u64,
74-
len: self.len as u64,
75-
send_checksums: Some(true),
76-
..Default::default()
77-
};
78118

79-
debug!("Block read op request {:?}", &message);
119+
let datanode = &self.block.locs[self.current_replica].id;
120+
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
80121

81-
connection.send(Op::ReadBlock, &message).await?;
82-
let response = connection.read_block_op_response().await?;
83-
debug!("Block read op response {:?}", response);
122+
let (connection, response) = connect_and_send(
123+
&datanode_url,
124+
&self.block.b,
125+
self.block.block_token.clone(),
126+
self.offset as u64,
127+
self.len as u64,
128+
)
129+
.await?;
84130

85131
if response.status() != hdfs::Status::Success {
86132
return Err(HdfsError::DataTransferError(response.message().to_string()));
@@ -96,13 +142,20 @@ impl ReplicatedBlockStream {
96142
if self.connection.is_none() {
97143
self.select_next_datanode().await?;
98144
}
99-
let conn = self.connection.as_mut().unwrap();
100145

101146
if self.len == 0 {
147+
let mut conn = self.connection.take().unwrap();
148+
149+
// Read the final empty packet
150+
conn.read_packet().await?;
151+
102152
conn.send_read_success().await?;
153+
DATANODE_CACHE.release(conn);
103154
return Ok(None);
104155
}
105156

157+
let conn = self.connection.as_mut().unwrap();
158+
106159
let packet = conn.read_packet().await?;
107160

108161
let packet_offset = if self.offset > packet.header.offset_in_block as usize {
@@ -336,29 +389,22 @@ impl StripedBlockStream {
336389
return Ok(());
337390
}
338391

339-
let mut conn =
340-
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
341-
.await?;
342-
343-
let message = hdfs::OpReadBlockProto {
344-
header: conn.build_header(block, Some(token.clone())),
345-
offset: offset as u64,
346-
len: len as u64,
347-
send_checksums: Some(true),
348-
..Default::default()
349-
};
350-
debug!("Block read op request {:?}", &message);
351-
352-
conn.send(Op::ReadBlock, &message).await?;
353-
let response = conn.read_block_op_response().await?;
354-
debug!("Block read op response {:?}", response);
392+
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
393+
let (mut connection, response) = connect_and_send(
394+
&datanode_url,
395+
block,
396+
token.clone(),
397+
offset as u64,
398+
len as u64,
399+
)
400+
.await?;
355401

356402
if response.status() != hdfs::Status::Success {
357403
return Err(HdfsError::DataTransferError(response.message().to_string()));
358404
}
359405

360406
// First handle the offset into the first packet
361-
let mut packet = conn.read_packet().await?;
407+
let mut packet = connection.read_packet().await?;
362408
let packet_offset = offset - packet.header.offset_in_block as usize;
363409
let data_len = packet.header.data_len as usize - packet_offset;
364410
let data_to_read = usize::min(data_len, len);
@@ -368,7 +414,7 @@ impl StripedBlockStream {
368414
buf.put(packet_data.slice(packet_offset..(packet_offset + data_to_read)));
369415

370416
while data_left > 0 {
371-
packet = conn.read_packet().await?;
417+
packet = connection.read_packet().await?;
372418
// TODO: Error checking
373419
let data_to_read = usize::min(data_left, packet.header.data_len as usize);
374420
buf.put(
@@ -380,7 +426,9 @@ impl StripedBlockStream {
380426
}
381427

382428
// There should be one last empty packet after we are done
383-
conn.read_packet().await?;
429+
connection.read_packet().await?;
430+
connection.send_read_success().await?;
431+
DATANODE_CACHE.release(connection);
384432

385433
Ok(())
386434
}

crates/hdfs-native/src/hdfs/block_writer.rs

+8-10
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@ use tokio::{sync::mpsc, task::JoinHandle};
77

88
use crate::{
99
ec::{gf256::Coder, EcSchema},
10-
hdfs::connection::{DatanodeConnection, Op},
10+
hdfs::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, Packet},
1111
proto::hdfs,
1212
HdfsError, Result,
1313
};
1414

15-
use super::connection::{DatanodeReader, DatanodeWriter, Packet};
16-
1715
const HEART_BEAT_SEQNO: i64 = -1;
1816
const UNKNOWN_SEQNO: i64 = -2;
1917

@@ -87,7 +85,7 @@ pub(crate) struct ReplicatedBlockWriter {
8785
// acknowledgements. Set to Ok(()) when the last acknowledgement is received.
8886
ack_listener_handle: JoinHandle<Result<()>>,
8987
// Tracks the state of packet sender. Set to Err if any error occurs during writing packets,
90-
packet_sender_handle: JoinHandle<Result<DatanodeWriter>>,
88+
packet_sender_handle: JoinHandle<Result<()>>,
9189
// Tracks the heartbeat task so we can abort it when we close
9290
heartbeat_handle: JoinHandle<()>,
9391

@@ -136,9 +134,7 @@ impl ReplicatedBlockWriter {
136134
};
137135

138136
debug!("Block write request: {:?}", &message);
139-
140-
connection.send(Op::WriteBlock, &message).await?;
141-
let response = connection.read_block_op_response().await?;
137+
let response = connection.send(Op::WriteBlock, &message).await?;
142138
debug!("Block write response: {:?}", response);
143139

144140
let (reader, writer) = connection.split();
@@ -301,7 +297,9 @@ impl ReplicatedBlockWriter {
301297
HdfsError::DataTransferError(
302298
"Ack status channel closed while waiting for final ack".to_string(),
303299
)
304-
})?
300+
})??;
301+
302+
Ok(())
305303
}
306304

307305
fn listen_for_acks(
@@ -353,7 +351,7 @@ impl ReplicatedBlockWriter {
353351
fn start_packet_sender(
354352
mut writer: DatanodeWriter,
355353
mut packet_receiver: mpsc::Receiver<Packet>,
356-
) -> JoinHandle<Result<DatanodeWriter>> {
354+
) -> JoinHandle<Result<()>> {
357355
tokio::spawn(async move {
358356
while let Some(mut packet) = packet_receiver.recv().await {
359357
writer.write_packet(&mut packet).await?;
@@ -362,7 +360,7 @@ impl ReplicatedBlockWriter {
362360
break;
363361
}
364362
}
365-
Ok(writer)
363+
Ok(())
366364
})
367365
}
368366

0 commit comments

Comments
 (0)