From 36edd35e3f09a57ce5bf1d954da27b625efcf09d Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 19 Feb 2024 20:02:40 -0500 Subject: [PATCH 1/7] Implement global cache for datanode connections --- Cargo.lock | 47 +++++++++--- Cargo.toml | 5 +- crates/hdfs-native/Cargo.toml | 3 +- crates/hdfs-native/src/hdfs/block_reader.rs | 32 +++++---- crates/hdfs-native/src/hdfs/block_writer.rs | 31 ++++---- crates/hdfs-native/src/hdfs/connection.rs | 79 ++++++++++++++++----- crates/hdfs-native/src/security/gssapi.rs | 2 +- 7 files changed, 143 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d53ee1..baa6c42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,6 +128,29 @@ dependencies = [ "which", ] +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.4.1", + "cexpr", + "clang-sys", + "itertools 0.12.0", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.46", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -278,9 +301,9 @@ checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "crc" -version = "3.0.1" +version = "3.1.0-beta.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +checksum = "a02c6ee9f6aa3049d991777025b73fbecc46696f3b3946c4eae4a0fbedec65ab" dependencies = [ "crc-catalog", ] @@ -456,7 +479,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f38e500596a428817fd4fd8a9a21da32f4edb3250e87886039670b12ea02f5d" dependencies = [ - "bindgen", + "bindgen 0.64.0", "cc", "lazy_static", "libc", @@ -658,6 +681,7 @@ dependencies = [ "libgssapi", "log", "num-traits", + "once_cell", "prost", "prost-build", "prost-types", @@ -933,9 +957,9 @@ checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libgssapi" -version = "0.6.4" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcfb7f77cbefc242a46ea667491c4f1129712f563cd368623d3f1b261a90e5f" +checksum = "c22f0430969e524b2177498ca3eeed48faca6f6c80f1b098d27ecbec84222f3a" dependencies = [ "bitflags 2.4.1", "bytes", @@ -945,11 +969,12 @@ dependencies = [ [[package]] name = "libgssapi-sys" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efdcdd31923aa6280d41ff2636fd93a18cc60fe25983b24887d1a8d24478cbfb" +checksum = "b57d9a71c774ec53b1b9119dbbcc589b5209831f0ddc0ff4210640051f545372" dependencies = [ - "bindgen", + "bindgen 0.69.4", + "pkg-config", ] [[package]] @@ -1179,6 +1204,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + [[package]] name = "plotters" version = "0.3.5" diff --git a/Cargo.toml b/Cargo.toml index 82802d2..d00a189 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,7 @@ resolver = "2" [workspace.dependencies] bytes = "1" futures = "0.3" -tokio = "1" \ No newline at end of file +tokio = "1" + +[profile.bench] +debug = true \ No newline at end of file diff --git a/crates/hdfs-native/Cargo.toml b/crates/hdfs-native/Cargo.toml index f13240c..bd524b0 100644 --- a/crates/hdfs-native/Cargo.toml +++ b/crates/hdfs-native/Cargo.toml @@ -18,9 +18,10 @@ futures = { workspace = true } g2p = "1" gsasl-sys = { version = "0.2", default-features = false, optional = true } libc = "0.2" -libgssapi = { version = "0.6", default-features = false, optional = true } +libgssapi = { version = "0.7", default-features = false, optional = true } log = "0.4" num-traits = "0.2" +once_cell = "1.19.0" prost = "0.12" prost-types = "0.12" roxmltree = "0.18" diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index 8f2729d..5d825b3 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -10,7 +10,7 @@ use log::debug; use crate::{ ec::EcSchema, - hdfs::connection::{DatanodeConnection, Op}, + hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE}, proto::{common, hdfs}, HdfsError, Result, }; @@ -64,9 +64,9 @@ impl ReplicatedBlockStream { } } let datanode = &self.block.locs[self.current_replica].id; - let mut connection = - DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + let mut connection = DATANODE_CACHE + .get(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) + .await?; let message = hdfs::OpReadBlockProto { header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())), @@ -77,9 +77,7 @@ impl ReplicatedBlockStream { }; debug!("Block read op request {:?}", &message); - - connection.send(Op::ReadBlock, &message).await?; - let response = connection.read_block_op_response().await?; + let response = connection.send(Op::ReadBlock, &message).await?; debug!("Block read op response {:?}", response); if response.status() != hdfs::Status::Success { @@ -96,13 +94,20 @@ impl ReplicatedBlockStream { if self.connection.is_none() { self.select_next_datanode().await?; } - let conn = self.connection.as_mut().unwrap(); if self.len == 0 { + let mut conn = self.connection.take().unwrap(); + + // Read the final empty packet + conn.read_packet().await?; + conn.send_read_success().await?; + DATANODE_CACHE.release(conn); return Ok(None); } + let conn = self.connection.as_mut().unwrap(); + let packet = conn.read_packet().await?; let packet_offset = if self.offset > packet.header.offset_in_block as usize { @@ -336,9 +341,9 @@ impl StripedBlockStream { return Ok(()); } - let mut conn = - DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + let mut conn = DATANODE_CACHE + .get(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) + .await?; let message = hdfs::OpReadBlockProto { header: conn.build_header(block, Some(token.clone())), @@ -347,10 +352,9 @@ impl StripedBlockStream { send_checksums: Some(true), ..Default::default() }; - debug!("Block read op request {:?}", &message); - conn.send(Op::ReadBlock, &message).await?; - let response = conn.read_block_op_response().await?; + debug!("Block read op request {:?}", &message); + let response = conn.send(Op::ReadBlock, &message).await?; debug!("Block read op response {:?}", response); if response.status() != hdfs::Status::Success { diff --git a/crates/hdfs-native/src/hdfs/block_writer.rs b/crates/hdfs-native/src/hdfs/block_writer.rs index 7317d48..1feab16 100644 --- a/crates/hdfs-native/src/hdfs/block_writer.rs +++ b/crates/hdfs-native/src/hdfs/block_writer.rs @@ -7,12 +7,12 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ ec::{gf256::Coder, EcSchema}, - hdfs::connection::{DatanodeConnection, Op}, + hdfs::connection::{Op, DATANODE_CACHE}, proto::hdfs, HdfsError, Result, }; -use super::connection::{DatanodeReader, DatanodeWriter, Packet}; +use super::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Packet}; const HEART_BEAT_SEQNO: i64 = -1; const UNKNOWN_SEQNO: i64 = -2; @@ -85,7 +85,7 @@ pub(crate) struct ReplicatedBlockWriter { // Tracks the state of acknowledgements. Set to an Err if any error occurs doing receiving // acknowledgements. Set to Ok(()) when the last acknowledgement is received. - ack_listener_handle: JoinHandle>, + ack_listener_handle: JoinHandle>, // Tracks the state of packet sender. Set to Err if any error occurs during writing packets, packet_sender_handle: JoinHandle>, // Tracks the heartbeat task so we can abort it when we close @@ -102,9 +102,9 @@ impl ReplicatedBlockWriter { server_defaults: hdfs::FsServerDefaultsProto, ) -> Result { let datanode = &block.locs[0].id; - let mut connection = - DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + let mut connection = DATANODE_CACHE + .get(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) + .await?; let checksum = hdfs::ChecksumProto { r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32, @@ -136,9 +136,7 @@ impl ReplicatedBlockWriter { }; debug!("Block write request: {:?}", &message); - - connection.send(Op::WriteBlock, &message).await?; - let response = connection.read_block_op_response().await?; + let response = connection.send(Op::WriteBlock, &message).await?; debug!("Block write response: {:?}", response); let (reader, writer) = connection.split(); @@ -290,24 +288,29 @@ impl ReplicatedBlockWriter { self.heartbeat_handle.abort(); // Wait for all packets to be sent - self.packet_sender_handle.await.map_err(|_| { + let writer = self.packet_sender_handle.await.map_err(|_| { HdfsError::DataTransferError( "Packet sender task err while waiting for packets to send".to_string(), ) })??; // Wait for the channel to close, meaning all acks have been received or an error occured - self.ack_listener_handle.await.map_err(|_| { + let reader = self.ack_listener_handle.await.map_err(|_| { HdfsError::DataTransferError( "Ack status channel closed while waiting for final ack".to_string(), ) - })? + })??; + + let conn = DatanodeConnection::reunite(reader, writer); + // DATANODE_CACHE.release(conn); + + Ok(()) } fn listen_for_acks( mut reader: DatanodeReader, mut ack_queue: mpsc::Receiver<(i64, bool)>, - ) -> JoinHandle> { + ) -> JoinHandle> { tokio::spawn(async move { loop { let next_ack = reader.read_ack().await?; @@ -339,7 +342,7 @@ impl ReplicatedBlockWriter { } if last_packet { - return Ok(()); + return Ok(reader); } } else { return Err(HdfsError::DataTransferError( diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 7072010..a96c96d 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::default::Default; use std::io::ErrorKind; use std::sync::atomic::{AtomicI32, Ordering}; @@ -8,6 +8,7 @@ use std::sync::Mutex; use bytes::{Buf, BufMut, Bytes, BytesMut}; use crc::{Crc, CRC_32_CKSUM, CRC_32_ISCSI}; use log::{debug, warn}; +use once_cell::sync::Lazy; use prost::Message; use socket2::SockRef; use tokio::io::BufStream; @@ -36,6 +37,9 @@ const MAX_PACKET_HEADER_SIZE: usize = 33; const CRC32: Crc = Crc::::new(&CRC_32_CKSUM); const CRC32C: Crc = Crc::::new(&CRC_32_ISCSI); +pub(crate) static DATANODE_CACHE: Lazy = + Lazy::new(DatanodeConnectionCache::new); + // Connect to a remote host and return a TcpStream with standard options we want async fn connect(addr: &str) -> Result { let stream = TcpStream::connect(addr).await?; @@ -511,20 +515,26 @@ impl Packet { pub(crate) struct DatanodeConnection { client_name: String, stream: BufStream, + url: String, } impl DatanodeConnection { - pub(crate) async fn connect(url: &str) -> Result { + async fn connect(url: &str) -> Result { let stream = BufStream::new(connect(url).await?); let conn = DatanodeConnection { client_name: Uuid::new_v4().to_string(), stream, + url: url.to_string(), }; Ok(conn) } - pub(crate) async fn send(&mut self, op: Op, message: &impl Message) -> Result<()> { + pub(crate) async fn send( + &mut self, + op: Op, + message: &impl Message, + ) -> Result { self.stream .write_all(&DATA_TRANSFER_VERSION.to_be_bytes()) .await?; @@ -533,7 +543,16 @@ impl DatanodeConnection { .write_all(&message.encode_length_delimited_to_vec()) .await?; self.stream.flush().await?; - Ok(()) + + let buf = self.stream.fill_buf().await?; + let msg_length = prost::decode_length_delimiter(buf)?; + let total_size = msg_length + prost::length_delimiter_len(msg_length); + + let mut response_buf = BytesMut::zeroed(total_size); + self.stream.read_exact(&mut response_buf).await?; + + let response = hdfs::BlockOpResponseProto::decode_length_delimited(response_buf.freeze())?; + Ok(response) } pub(crate) fn build_header( @@ -553,18 +572,6 @@ impl DatanodeConnection { } } - pub(crate) async fn read_block_op_response(&mut self) -> Result { - let buf = self.stream.fill_buf().await?; - let msg_length = prost::decode_length_delimiter(buf)?; - let total_size = msg_length + prost::length_delimiter_len(msg_length); - - let mut response_buf = BytesMut::zeroed(total_size); - self.stream.read_exact(&mut response_buf).await?; - - let response = hdfs::BlockOpResponseProto::decode_length_delimited(response_buf.freeze())?; - Ok(response) - } - pub(crate) async fn read_packet(&mut self) -> Result { let mut payload_len_buf = [0u8; 4]; let mut header_len_buf = [0u8; 2]; @@ -596,7 +603,6 @@ impl DatanodeConnection { .write_all(&client_read_status.encode_length_delimited_to_vec()) .await?; self.stream.flush().await?; - self.stream.shutdown().await?; Ok(()) } @@ -606,6 +612,7 @@ impl DatanodeConnection { let reader = DatanodeReader { client_name: self.client_name.clone(), reader: BufReader::new(reader), + url: self.url, }; let writer = DatanodeWriter { client_name: self.client_name, @@ -622,6 +629,7 @@ impl DatanodeConnection { Self { client_name: reader.client_name, stream, + url: reader.url, } } } @@ -631,6 +639,7 @@ impl DatanodeConnection { pub(crate) struct DatanodeReader { client_name: String, reader: BufReader, + url: String, } impl DatanodeReader { @@ -680,6 +689,42 @@ impl DatanodeWriter { } } +pub(crate) struct DatanodeConnectionCache { + cache: Mutex>>, +} + +impl DatanodeConnectionCache { + fn new() -> Self { + Self { + cache: Mutex::new(HashMap::new()), + } + } + + pub(crate) async fn get(&self, url: &str) -> Result { + let conn = self + .cache + .lock() + .unwrap() + .get_mut(url) + .iter_mut() + .flat_map(|conns| conns.pop_front()) + .next(); + + if let Some(conn) = conn { + debug!("Returning cached connection"); + Ok(conn) + } else { + debug!("Creating new connection"); + DatanodeConnection::connect(url).await + } + } + + pub(crate) fn release(&self, conn: DatanodeConnection) { + let mut cache = self.cache.lock().unwrap(); + cache.entry(conn.url.clone()).or_default().push_back(conn); + } +} + #[cfg(test)] mod test { use std::collections::HashMap; diff --git a/crates/hdfs-native/src/security/gssapi.rs b/crates/hdfs-native/src/security/gssapi.rs index f3a5df7..c6aeff8 100644 --- a/crates/hdfs-native/src/security/gssapi.rs +++ b/crates/hdfs-native/src/security/gssapi.rs @@ -53,7 +53,7 @@ impl GssapiSession { let principal = cred.name()?.to_string(); let state = GssapiState::Pending(ClientCtx::new( - cred, + Some(cred), target, // Allow all flags. Setting them does not mean the final context will provide // them, so this should not be an issue. From 6a96e2edc2b00215f20564ec68ea55156f2de2b0 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 19 Feb 2024 21:08:23 -0500 Subject: [PATCH 2/7] Update lockfile --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index baa6c42..6ad3a77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -301,9 +301,9 @@ checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "crc" -version = "3.1.0-beta.1" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a02c6ee9f6aa3049d991777025b73fbecc46696f3b3946c4eae4a0fbedec65ab" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" dependencies = [ "crc-catalog", ] From 883e86981a638b5e8d3350e50f558d5f237bec44 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 20 Feb 2024 06:39:58 -0500 Subject: [PATCH 3/7] Only reader supports caching --- crates/hdfs-native/src/hdfs/block_reader.rs | 29 +++++++++++++-------- crates/hdfs-native/src/hdfs/block_writer.rs | 29 +++++++++------------ crates/hdfs-native/src/hdfs/connection.rs | 17 +++--------- 3 files changed, 34 insertions(+), 41 deletions(-) diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index 5d825b3..4086007 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -64,9 +64,13 @@ impl ReplicatedBlockStream { } } let datanode = &self.block.locs[self.current_replica].id; - let mut connection = DATANODE_CACHE - .get(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + + let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); + let mut connection = if let Some(conn) = DATANODE_CACHE.get(&datanode_url) { + conn + } else { + DatanodeConnection::connect(&datanode_url).await? + }; let message = hdfs::OpReadBlockProto { header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())), @@ -341,12 +345,15 @@ impl StripedBlockStream { return Ok(()); } - let mut conn = DATANODE_CACHE - .get(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); + let mut connection = if let Some(conn) = DATANODE_CACHE.get(&datanode_url) { + conn + } else { + DatanodeConnection::connect(&datanode_url).await? + }; let message = hdfs::OpReadBlockProto { - header: conn.build_header(block, Some(token.clone())), + header: connection.build_header(block, Some(token.clone())), offset: offset as u64, len: len as u64, send_checksums: Some(true), @@ -354,7 +361,7 @@ impl StripedBlockStream { }; debug!("Block read op request {:?}", &message); - let response = conn.send(Op::ReadBlock, &message).await?; + let response = connection.send(Op::ReadBlock, &message).await?; debug!("Block read op response {:?}", response); if response.status() != hdfs::Status::Success { @@ -362,7 +369,7 @@ impl StripedBlockStream { } // First handle the offset into the first packet - let mut packet = conn.read_packet().await?; + let mut packet = connection.read_packet().await?; let packet_offset = offset - packet.header.offset_in_block as usize; let data_len = packet.header.data_len as usize - packet_offset; let data_to_read = usize::min(data_len, len); @@ -372,7 +379,7 @@ impl StripedBlockStream { buf.put(packet_data.slice(packet_offset..(packet_offset + data_to_read))); while data_left > 0 { - packet = conn.read_packet().await?; + packet = connection.read_packet().await?; // TODO: Error checking let data_to_read = usize::min(data_left, packet.header.data_len as usize); buf.put( @@ -384,7 +391,7 @@ impl StripedBlockStream { } // There should be one last empty packet after we are done - conn.read_packet().await?; + connection.read_packet().await?; Ok(()) } diff --git a/crates/hdfs-native/src/hdfs/block_writer.rs b/crates/hdfs-native/src/hdfs/block_writer.rs index 1feab16..31f2bc5 100644 --- a/crates/hdfs-native/src/hdfs/block_writer.rs +++ b/crates/hdfs-native/src/hdfs/block_writer.rs @@ -7,13 +7,11 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ ec::{gf256::Coder, EcSchema}, - hdfs::connection::{Op, DATANODE_CACHE}, + hdfs::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, Packet}, proto::hdfs, HdfsError, Result, }; -use super::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Packet}; - const HEART_BEAT_SEQNO: i64 = -1; const UNKNOWN_SEQNO: i64 = -2; @@ -85,9 +83,9 @@ pub(crate) struct ReplicatedBlockWriter { // Tracks the state of acknowledgements. Set to an Err if any error occurs doing receiving // acknowledgements. Set to Ok(()) when the last acknowledgement is received. - ack_listener_handle: JoinHandle>, + ack_listener_handle: JoinHandle>, // Tracks the state of packet sender. Set to Err if any error occurs during writing packets, - packet_sender_handle: JoinHandle>, + packet_sender_handle: JoinHandle>, // Tracks the heartbeat task so we can abort it when we close heartbeat_handle: JoinHandle<()>, @@ -102,9 +100,9 @@ impl ReplicatedBlockWriter { server_defaults: hdfs::FsServerDefaultsProto, ) -> Result { let datanode = &block.locs[0].id; - let mut connection = DATANODE_CACHE - .get(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; + let mut connection = + DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) + .await?; let checksum = hdfs::ChecksumProto { r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32, @@ -288,29 +286,26 @@ impl ReplicatedBlockWriter { self.heartbeat_handle.abort(); // Wait for all packets to be sent - let writer = self.packet_sender_handle.await.map_err(|_| { + self.packet_sender_handle.await.map_err(|_| { HdfsError::DataTransferError( "Packet sender task err while waiting for packets to send".to_string(), ) })??; // Wait for the channel to close, meaning all acks have been received or an error occured - let reader = self.ack_listener_handle.await.map_err(|_| { + self.ack_listener_handle.await.map_err(|_| { HdfsError::DataTransferError( "Ack status channel closed while waiting for final ack".to_string(), ) })??; - let conn = DatanodeConnection::reunite(reader, writer); - // DATANODE_CACHE.release(conn); - Ok(()) } fn listen_for_acks( mut reader: DatanodeReader, mut ack_queue: mpsc::Receiver<(i64, bool)>, - ) -> JoinHandle> { + ) -> JoinHandle> { tokio::spawn(async move { loop { let next_ack = reader.read_ack().await?; @@ -342,7 +337,7 @@ impl ReplicatedBlockWriter { } if last_packet { - return Ok(reader); + return Ok(()); } } else { return Err(HdfsError::DataTransferError( @@ -356,7 +351,7 @@ impl ReplicatedBlockWriter { fn start_packet_sender( mut writer: DatanodeWriter, mut packet_receiver: mpsc::Receiver, - ) -> JoinHandle> { + ) -> JoinHandle> { tokio::spawn(async move { while let Some(mut packet) = packet_receiver.recv().await { writer.write_packet(&mut packet).await?; @@ -365,7 +360,7 @@ impl ReplicatedBlockWriter { break; } } - Ok(writer) + Ok(()) }) } diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 1409681..98a755b 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -519,7 +519,7 @@ pub(crate) struct DatanodeConnection { } impl DatanodeConnection { - async fn connect(url: &str) -> Result { + pub(crate) async fn connect(url: &str) -> Result { let stream = BufStream::new(connect(url).await?); let conn = DatanodeConnection { @@ -700,23 +700,14 @@ impl DatanodeConnectionCache { } } - pub(crate) async fn get(&self, url: &str) -> Result { - let conn = self - .cache + pub(crate) fn get(&self, url: &str) -> Option { + self.cache .lock() .unwrap() .get_mut(url) .iter_mut() .flat_map(|conns| conns.pop_front()) - .next(); - - if let Some(conn) = conn { - debug!("Returning cached connection"); - Ok(conn) - } else { - debug!("Creating new connection"); - DatanodeConnection::connect(url).await - } + .next() } pub(crate) fn release(&self, conn: DatanodeConnection) { From effce82863d28f50a32394ecd10621ae695d8a7a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 20 Feb 2024 06:57:59 -0500 Subject: [PATCH 4/7] Add cache expiry --- Cargo.lock | 1 + Cargo.toml | 1 + crates/hdfs-native-object-store/Cargo.toml | 2 +- crates/hdfs-native/Cargo.toml | 1 + crates/hdfs-native/src/hdfs/connection.rs | 30 +++++++++++++++++----- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9d8de0..b87a0ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,7 @@ version = "0.7.0" dependencies = [ "base64", "bytes", + "chrono", "crc", "criterion", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index d00a189..1ea2f46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ resolver = "2" [workspace.dependencies] bytes = "1" +chrono = "0.4" futures = "0.3" tokio = "1" diff --git a/crates/hdfs-native-object-store/Cargo.toml b/crates/hdfs-native-object-store/Cargo.toml index 3605738..6a6fa25 100644 --- a/crates/hdfs-native-object-store/Cargo.toml +++ b/crates/hdfs-native-object-store/Cargo.toml @@ -13,7 +13,7 @@ license = "Apache-2.0" [dependencies] async-trait = { version = "0.1" } bytes = { workspace = true } -chrono = { version = "0.4" } +chrono = { workspace = true } futures = { workspace = true } hdfs-native = { path = "../hdfs-native", version = "0.7" } object_store = { version = "0.9", features = ["cloud"] } diff --git a/crates/hdfs-native/Cargo.toml b/crates/hdfs-native/Cargo.toml index a261073..2e96d39 100644 --- a/crates/hdfs-native/Cargo.toml +++ b/crates/hdfs-native/Cargo.toml @@ -13,6 +13,7 @@ license = "Apache-2.0" [dependencies] base64 = "0.21" bytes = { workspace = true } +chrono = { workspace = true } crc = "3.1.0-beta.1" futures = { workspace = true } g2p = "1" diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 98a755b..38d72f2 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use std::sync::Mutex; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use chrono::{prelude::*, TimeDelta}; use crc::{Crc, Slice16, CRC_32_CKSUM, CRC_32_ISCSI}; use log::{debug, warn}; use once_cell::sync::Lazy; @@ -31,8 +32,8 @@ use crate::{HdfsError, Result}; const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; const DATA_TRANSFER_VERSION: u16 = 28; - const MAX_PACKET_HEADER_SIZE: usize = 33; +const DATANODE_CACHE_EXPIRY: TimeDelta = TimeDelta::seconds(5); const CRC32: Crc> = Crc::>::new(&CRC_32_CKSUM); const CRC32C: Crc> = Crc::>::new(&CRC_32_ISCSI); @@ -690,7 +691,7 @@ impl DatanodeWriter { } pub(crate) struct DatanodeConnectionCache { - cache: Mutex>>, + cache: Mutex, DatanodeConnection)>>>, } impl DatanodeConnectionCache { @@ -701,18 +702,35 @@ impl DatanodeConnectionCache { } pub(crate) fn get(&self, url: &str) -> Option { - self.cache - .lock() - .unwrap() + // Keep things simply and just expire cache entries when checking the cache. We could + // move this to its own task but that will add a little more complexity. + self.remove_expired(); + + let mut cache = self.cache.lock().unwrap(); + + cache .get_mut(url) .iter_mut() .flat_map(|conns| conns.pop_front()) + .map(|(_, conn)| conn) .next() } pub(crate) fn release(&self, conn: DatanodeConnection) { + let expire_at = Utc::now() + DATANODE_CACHE_EXPIRY; let mut cache = self.cache.lock().unwrap(); - cache.entry(conn.url.clone()).or_default().push_back(conn); + cache + .entry(conn.url.clone()) + .or_default() + .push_back((expire_at, conn)); + } + + fn remove_expired(&self) { + let mut cache = self.cache.lock().unwrap(); + let now = Utc::now(); + for (_, values) in cache.iter_mut() { + values.retain(|(expire_at, _)| expire_at > &now) + } } } From d1a7808ec98baa103a38d5bd4d32603a70eb71ef Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 20 Feb 2024 07:57:20 -0500 Subject: [PATCH 5/7] Drop cache time --- crates/hdfs-native/src/hdfs/connection.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 38d72f2..0fa7880 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -33,7 +33,7 @@ use crate::{HdfsError, Result}; const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; const DATA_TRANSFER_VERSION: u16 = 28; const MAX_PACKET_HEADER_SIZE: usize = 33; -const DATANODE_CACHE_EXPIRY: TimeDelta = TimeDelta::seconds(5); +const DATANODE_CACHE_EXPIRY: TimeDelta = TimeDelta::seconds(3); const CRC32: Crc> = Crc::>::new(&CRC_32_CKSUM); const CRC32C: Crc> = Crc::>::new(&CRC_32_ISCSI); @@ -546,6 +546,9 @@ impl DatanodeConnection { self.stream.flush().await?; let buf = self.stream.fill_buf().await?; + if buf.is_empty() { + return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?; + } let msg_length = prost::decode_length_delimiter(buf)?; let total_size = msg_length + prost::length_delimiter_len(msg_length); From 34c1c6b0000aedf681e3d026ea41febbc80f04d7 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 21 Feb 2024 20:48:02 -0500 Subject: [PATCH 6/7] Add error handling and retries for using cached connections --- crates/hdfs-native/src/hdfs/block_reader.rs | 107 +++++++++++++------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index 4086007..83231a2 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -6,12 +6,15 @@ use futures::{ stream::{self, BoxStream}, Stream, StreamExt, }; -use log::debug; +use log::{debug, warn}; use crate::{ ec::EcSchema, hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE}, - proto::{common, hdfs}, + proto::{ + common, + hdfs::{self, BlockOpResponseProto}, + }, HdfsError, Result, }; @@ -32,6 +35,55 @@ pub(crate) fn get_block_stream( } } +/// Connects to a DataNode to do a read, attempting to used cached connections. +async fn connect_and_send( + url: &str, + block: &hdfs::ExtendedBlockProto, + token: common::TokenProto, + offset: u64, + len: u64, +) -> Result<(DatanodeConnection, BlockOpResponseProto)> { + let mut remaining_attempts = 2; + while remaining_attempts > 0 { + if let Some(mut conn) = DATANODE_CACHE.get(url) { + let message = hdfs::OpReadBlockProto { + header: conn.build_header(block, Some(token.clone())), + offset, + len, + send_checksums: Some(true), + ..Default::default() + }; + debug!("Block read op request {:?}", &message); + match conn.send(Op::ReadBlock, &message).await { + Ok(response) => { + debug!("Block read op response {:?}", response); + return Ok((conn, response)); + } + Err(e) => { + warn!("Failed to use cached connection: {:?}", e); + } + } + } else { + break; + } + remaining_attempts -= 1; + } + let mut conn = DatanodeConnection::connect(url).await?; + + let message = hdfs::OpReadBlockProto { + header: conn.build_header(block, Some(token)), + offset, + len, + send_checksums: Some(true), + ..Default::default() + }; + + debug!("Block read op request {:?}", &message); + let response = conn.send(Op::ReadBlock, &message).await?; + debug!("Block read op response {:?}", response); + Ok((conn, response)) +} + struct ReplicatedBlockStream { block: hdfs::LocatedBlockProto, offset: usize, @@ -63,26 +115,18 @@ impl ReplicatedBlockStream { )); } } - let datanode = &self.block.locs[self.current_replica].id; + let datanode = &self.block.locs[self.current_replica].id; let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); - let mut connection = if let Some(conn) = DATANODE_CACHE.get(&datanode_url) { - conn - } else { - DatanodeConnection::connect(&datanode_url).await? - }; - let message = hdfs::OpReadBlockProto { - header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())), - offset: self.offset as u64, - len: self.len as u64, - send_checksums: Some(true), - ..Default::default() - }; - - debug!("Block read op request {:?}", &message); - let response = connection.send(Op::ReadBlock, &message).await?; - debug!("Block read op response {:?}", response); + let (connection, response) = connect_and_send( + &datanode_url, + &self.block.b, + self.block.block_token.clone(), + self.offset as u64, + self.len as u64, + ) + .await?; if response.status() != hdfs::Status::Success { return Err(HdfsError::DataTransferError(response.message().to_string())); @@ -346,23 +390,14 @@ impl StripedBlockStream { } let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); - let mut connection = if let Some(conn) = DATANODE_CACHE.get(&datanode_url) { - conn - } else { - DatanodeConnection::connect(&datanode_url).await? - }; - - let message = hdfs::OpReadBlockProto { - header: connection.build_header(block, Some(token.clone())), - offset: offset as u64, - len: len as u64, - send_checksums: Some(true), - ..Default::default() - }; - - debug!("Block read op request {:?}", &message); - let response = connection.send(Op::ReadBlock, &message).await?; - debug!("Block read op response {:?}", response); + let (mut connection, response) = connect_and_send( + &datanode_url, + block, + token.clone(), + self.offset as u64, + self.len as u64, + ) + .await?; if response.status() != hdfs::Status::Success { return Err(HdfsError::DataTransferError(response.message().to_string())); From 6fb5562b5bea4054280de4c6a855fe6e48c548b0 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 21 Feb 2024 21:06:08 -0500 Subject: [PATCH 7/7] Fix erasure coded read --- crates/hdfs-native/src/hdfs/block_reader.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index 83231a2..a720865 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -394,8 +394,8 @@ impl StripedBlockStream { &datanode_url, block, token.clone(), - self.offset as u64, - self.len as u64, + offset as u64, + len as u64, ) .await?; @@ -427,6 +427,8 @@ impl StripedBlockStream { // There should be one last empty packet after we are done connection.read_packet().await?; + connection.send_read_success().await?; + DATANODE_CACHE.release(connection); Ok(()) }