diff --git a/Cargo.lock b/Cargo.lock index ba259b5..b87a0ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,7 @@ version = "0.7.0" dependencies = [ "base64", "bytes", + "chrono", "crc", "criterion", "env_logger", @@ -685,6 +686,7 @@ dependencies = [ "libgssapi", "log", "num-traits", + "once_cell", "prost", "prost-build", "prost-types", @@ -960,9 +962,9 @@ checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libgssapi" -version = "0.6.4" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcfb7f77cbefc242a46ea667491c4f1129712f563cd368623d3f1b261a90e5f" +checksum = "c22f0430969e524b2177498ca3eeed48faca6f6c80f1b098d27ecbec84222f3a" dependencies = [ "bitflags 2.4.2", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 82802d2..1ea2f46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,9 @@ resolver = "2" [workspace.dependencies] bytes = "1" +chrono = "0.4" futures = "0.3" -tokio = "1" \ No newline at end of file +tokio = "1" + +[profile.bench] +debug = true \ No newline at end of file diff --git a/crates/hdfs-native-object-store/Cargo.toml b/crates/hdfs-native-object-store/Cargo.toml index 3605738..6a6fa25 100644 --- a/crates/hdfs-native-object-store/Cargo.toml +++ b/crates/hdfs-native-object-store/Cargo.toml @@ -13,7 +13,7 @@ license = "Apache-2.0" [dependencies] async-trait = { version = "0.1" } bytes = { workspace = true } -chrono = { version = "0.4" } +chrono = { workspace = true } futures = { workspace = true } hdfs-native = { path = "../hdfs-native", version = "0.7" } object_store = { version = "0.9", features = ["cloud"] } diff --git a/crates/hdfs-native/Cargo.toml b/crates/hdfs-native/Cargo.toml index 95fe07b..2e96d39 100644 --- a/crates/hdfs-native/Cargo.toml +++ b/crates/hdfs-native/Cargo.toml @@ -13,14 +13,16 @@ license = "Apache-2.0" [dependencies] base64 = "0.21" bytes = { workspace = true } +chrono = { workspace = true } crc = "3.1.0-beta.1" futures = { workspace = true } g2p = "1" gsasl-sys = { version = "0.2", default-features = false, optional = true } libc = "0.2" -libgssapi = { version = "0.6", default-features = false, optional = true } +libgssapi = { version = "0.7", default-features = false, optional = true } log = "0.4" num-traits = "0.2" +once_cell = "1.19.0" prost = "0.12" prost-types = "0.12" roxmltree = "0.18" diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index 8f2729d..a720865 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -6,12 +6,15 @@ use futures::{ stream::{self, BoxStream}, Stream, StreamExt, }; -use log::debug; +use log::{debug, warn}; use crate::{ ec::EcSchema, - hdfs::connection::{DatanodeConnection, Op}, - proto::{common, hdfs}, + hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE}, + proto::{ + common, + hdfs::{self, BlockOpResponseProto}, + }, HdfsError, Result, }; @@ -32,6 +35,55 @@ pub(crate) fn get_block_stream( } } +/// Connects to a DataNode to do a read, attempting to used cached connections. +async fn connect_and_send( + url: &str, + block: &hdfs::ExtendedBlockProto, + token: common::TokenProto, + offset: u64, + len: u64, +) -> Result<(DatanodeConnection, BlockOpResponseProto)> { + let mut remaining_attempts = 2; + while remaining_attempts > 0 { + if let Some(mut conn) = DATANODE_CACHE.get(url) { + let message = hdfs::OpReadBlockProto { + header: conn.build_header(block, Some(token.clone())), + offset, + len, + send_checksums: Some(true), + ..Default::default() + }; + debug!("Block read op request {:?}", &message); + match conn.send(Op::ReadBlock, &message).await { + Ok(response) => { + debug!("Block read op response {:?}", response); + return Ok((conn, response)); + } + Err(e) => { + warn!("Failed to use cached connection: {:?}", e); + } + } + } else { + break; + } + remaining_attempts -= 1; + } + let mut conn = DatanodeConnection::connect(url).await?; + + let message = hdfs::OpReadBlockProto { + header: conn.build_header(block, Some(token)), + offset, + len, + send_checksums: Some(true), + ..Default::default() + }; + + debug!("Block read op request {:?}", &message); + let response = conn.send(Op::ReadBlock, &message).await?; + debug!("Block read op response {:?}", response); + Ok((conn, response)) +} + struct ReplicatedBlockStream { block: hdfs::LocatedBlockProto, offset: usize, @@ -63,24 +115,18 @@ impl ReplicatedBlockStream { )); } } - let datanode = &self.block.locs[self.current_replica].id; - let mut connection = - DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; - - let message = hdfs::OpReadBlockProto { - header: connection.build_header(&self.block.b, Some(self.block.block_token.clone())), - offset: self.offset as u64, - len: self.len as u64, - send_checksums: Some(true), - ..Default::default() - }; - debug!("Block read op request {:?}", &message); + let datanode = &self.block.locs[self.current_replica].id; + let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); - connection.send(Op::ReadBlock, &message).await?; - let response = connection.read_block_op_response().await?; - debug!("Block read op response {:?}", response); + let (connection, response) = connect_and_send( + &datanode_url, + &self.block.b, + self.block.block_token.clone(), + self.offset as u64, + self.len as u64, + ) + .await?; if response.status() != hdfs::Status::Success { return Err(HdfsError::DataTransferError(response.message().to_string())); @@ -96,13 +142,20 @@ impl ReplicatedBlockStream { if self.connection.is_none() { self.select_next_datanode().await?; } - let conn = self.connection.as_mut().unwrap(); if self.len == 0 { + let mut conn = self.connection.take().unwrap(); + + // Read the final empty packet + conn.read_packet().await?; + conn.send_read_success().await?; + DATANODE_CACHE.release(conn); return Ok(None); } + let conn = self.connection.as_mut().unwrap(); + let packet = conn.read_packet().await?; let packet_offset = if self.offset > packet.header.offset_in_block as usize { @@ -336,29 +389,22 @@ impl StripedBlockStream { return Ok(()); } - let mut conn = - DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port)) - .await?; - - let message = hdfs::OpReadBlockProto { - header: conn.build_header(block, Some(token.clone())), - offset: offset as u64, - len: len as u64, - send_checksums: Some(true), - ..Default::default() - }; - debug!("Block read op request {:?}", &message); - - conn.send(Op::ReadBlock, &message).await?; - let response = conn.read_block_op_response().await?; - debug!("Block read op response {:?}", response); + let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port); + let (mut connection, response) = connect_and_send( + &datanode_url, + block, + token.clone(), + offset as u64, + len as u64, + ) + .await?; if response.status() != hdfs::Status::Success { return Err(HdfsError::DataTransferError(response.message().to_string())); } // First handle the offset into the first packet - let mut packet = conn.read_packet().await?; + let mut packet = connection.read_packet().await?; let packet_offset = offset - packet.header.offset_in_block as usize; let data_len = packet.header.data_len as usize - packet_offset; let data_to_read = usize::min(data_len, len); @@ -368,7 +414,7 @@ impl StripedBlockStream { buf.put(packet_data.slice(packet_offset..(packet_offset + data_to_read))); while data_left > 0 { - packet = conn.read_packet().await?; + packet = connection.read_packet().await?; // TODO: Error checking let data_to_read = usize::min(data_left, packet.header.data_len as usize); buf.put( @@ -380,7 +426,9 @@ impl StripedBlockStream { } // There should be one last empty packet after we are done - conn.read_packet().await?; + connection.read_packet().await?; + connection.send_read_success().await?; + DATANODE_CACHE.release(connection); Ok(()) } diff --git a/crates/hdfs-native/src/hdfs/block_writer.rs b/crates/hdfs-native/src/hdfs/block_writer.rs index 7317d48..31f2bc5 100644 --- a/crates/hdfs-native/src/hdfs/block_writer.rs +++ b/crates/hdfs-native/src/hdfs/block_writer.rs @@ -7,13 +7,11 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ ec::{gf256::Coder, EcSchema}, - hdfs::connection::{DatanodeConnection, Op}, + hdfs::connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, Packet}, proto::hdfs, HdfsError, Result, }; -use super::connection::{DatanodeReader, DatanodeWriter, Packet}; - const HEART_BEAT_SEQNO: i64 = -1; const UNKNOWN_SEQNO: i64 = -2; @@ -87,7 +85,7 @@ pub(crate) struct ReplicatedBlockWriter { // acknowledgements. Set to Ok(()) when the last acknowledgement is received. ack_listener_handle: JoinHandle>, // Tracks the state of packet sender. Set to Err if any error occurs during writing packets, - packet_sender_handle: JoinHandle>, + packet_sender_handle: JoinHandle>, // Tracks the heartbeat task so we can abort it when we close heartbeat_handle: JoinHandle<()>, @@ -136,9 +134,7 @@ impl ReplicatedBlockWriter { }; debug!("Block write request: {:?}", &message); - - connection.send(Op::WriteBlock, &message).await?; - let response = connection.read_block_op_response().await?; + let response = connection.send(Op::WriteBlock, &message).await?; debug!("Block write response: {:?}", response); let (reader, writer) = connection.split(); @@ -301,7 +297,9 @@ impl ReplicatedBlockWriter { HdfsError::DataTransferError( "Ack status channel closed while waiting for final ack".to_string(), ) - })? + })??; + + Ok(()) } fn listen_for_acks( @@ -353,7 +351,7 @@ impl ReplicatedBlockWriter { fn start_packet_sender( mut writer: DatanodeWriter, mut packet_receiver: mpsc::Receiver, - ) -> JoinHandle> { + ) -> JoinHandle> { tokio::spawn(async move { while let Some(mut packet) = packet_receiver.recv().await { writer.write_packet(&mut packet).await?; @@ -362,7 +360,7 @@ impl ReplicatedBlockWriter { break; } } - Ok(writer) + Ok(()) }) } diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 20f0895..0fa7880 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::default::Default; use std::io::ErrorKind; use std::sync::atomic::{AtomicI32, Ordering}; @@ -6,8 +6,10 @@ use std::sync::Arc; use std::sync::Mutex; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use chrono::{prelude::*, TimeDelta}; use crc::{Crc, Slice16, CRC_32_CKSUM, CRC_32_ISCSI}; use log::{debug, warn}; +use once_cell::sync::Lazy; use prost::Message; use socket2::SockRef; use tokio::io::BufStream; @@ -30,12 +32,15 @@ use crate::{HdfsError, Result}; const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; const DATA_TRANSFER_VERSION: u16 = 28; - const MAX_PACKET_HEADER_SIZE: usize = 33; +const DATANODE_CACHE_EXPIRY: TimeDelta = TimeDelta::seconds(3); const CRC32: Crc> = Crc::>::new(&CRC_32_CKSUM); const CRC32C: Crc> = Crc::>::new(&CRC_32_ISCSI); +pub(crate) static DATANODE_CACHE: Lazy = + Lazy::new(DatanodeConnectionCache::new); + // Connect to a remote host and return a TcpStream with standard options we want async fn connect(addr: &str) -> Result { let stream = TcpStream::connect(addr).await?; @@ -511,6 +516,7 @@ impl Packet { pub(crate) struct DatanodeConnection { client_name: String, stream: BufStream, + url: String, } impl DatanodeConnection { @@ -520,11 +526,16 @@ impl DatanodeConnection { let conn = DatanodeConnection { client_name: Uuid::new_v4().to_string(), stream, + url: url.to_string(), }; Ok(conn) } - pub(crate) async fn send(&mut self, op: Op, message: &impl Message) -> Result<()> { + pub(crate) async fn send( + &mut self, + op: Op, + message: &impl Message, + ) -> Result { self.stream .write_all(&DATA_TRANSFER_VERSION.to_be_bytes()) .await?; @@ -533,7 +544,19 @@ impl DatanodeConnection { .write_all(&message.encode_length_delimited_to_vec()) .await?; self.stream.flush().await?; - Ok(()) + + let buf = self.stream.fill_buf().await?; + if buf.is_empty() { + return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?; + } + let msg_length = prost::decode_length_delimiter(buf)?; + let total_size = msg_length + prost::length_delimiter_len(msg_length); + + let mut response_buf = BytesMut::zeroed(total_size); + self.stream.read_exact(&mut response_buf).await?; + + let response = hdfs::BlockOpResponseProto::decode_length_delimited(response_buf.freeze())?; + Ok(response) } pub(crate) fn build_header( @@ -553,18 +576,6 @@ impl DatanodeConnection { } } - pub(crate) async fn read_block_op_response(&mut self) -> Result { - let buf = self.stream.fill_buf().await?; - let msg_length = prost::decode_length_delimiter(buf)?; - let total_size = msg_length + prost::length_delimiter_len(msg_length); - - let mut response_buf = BytesMut::zeroed(total_size); - self.stream.read_exact(&mut response_buf).await?; - - let response = hdfs::BlockOpResponseProto::decode_length_delimited(response_buf.freeze())?; - Ok(response) - } - pub(crate) async fn read_packet(&mut self) -> Result { let mut payload_len_buf = [0u8; 4]; let mut header_len_buf = [0u8; 2]; @@ -596,7 +607,6 @@ impl DatanodeConnection { .write_all(&client_read_status.encode_length_delimited_to_vec()) .await?; self.stream.flush().await?; - self.stream.shutdown().await?; Ok(()) } @@ -606,6 +616,7 @@ impl DatanodeConnection { let reader = DatanodeReader { client_name: self.client_name.clone(), reader: BufReader::new(reader), + url: self.url, }; let writer = DatanodeWriter { client_name: self.client_name, @@ -622,6 +633,7 @@ impl DatanodeConnection { Self { client_name: reader.client_name, stream, + url: reader.url, } } } @@ -631,6 +643,7 @@ impl DatanodeConnection { pub(crate) struct DatanodeReader { client_name: String, reader: BufReader, + url: String, } impl DatanodeReader { @@ -680,6 +693,50 @@ impl DatanodeWriter { } } +pub(crate) struct DatanodeConnectionCache { + cache: Mutex, DatanodeConnection)>>>, +} + +impl DatanodeConnectionCache { + fn new() -> Self { + Self { + cache: Mutex::new(HashMap::new()), + } + } + + pub(crate) fn get(&self, url: &str) -> Option { + // Keep things simply and just expire cache entries when checking the cache. We could + // move this to its own task but that will add a little more complexity. + self.remove_expired(); + + let mut cache = self.cache.lock().unwrap(); + + cache + .get_mut(url) + .iter_mut() + .flat_map(|conns| conns.pop_front()) + .map(|(_, conn)| conn) + .next() + } + + pub(crate) fn release(&self, conn: DatanodeConnection) { + let expire_at = Utc::now() + DATANODE_CACHE_EXPIRY; + let mut cache = self.cache.lock().unwrap(); + cache + .entry(conn.url.clone()) + .or_default() + .push_back((expire_at, conn)); + } + + fn remove_expired(&self) { + let mut cache = self.cache.lock().unwrap(); + let now = Utc::now(); + for (_, values) in cache.iter_mut() { + values.retain(|(expire_at, _)| expire_at > &now) + } + } +} + #[cfg(test)] mod test { use std::collections::HashMap; diff --git a/crates/hdfs-native/src/security/gssapi.rs b/crates/hdfs-native/src/security/gssapi.rs index f3a5df7..c6aeff8 100644 --- a/crates/hdfs-native/src/security/gssapi.rs +++ b/crates/hdfs-native/src/security/gssapi.rs @@ -53,7 +53,7 @@ impl GssapiSession { let principal = cred.name()?.to_string(); let state = GssapiState::Pending(ClientCtx::new( - cred, + Some(cred), target, // Allow all flags. Setting them does not mean the final context will provide // them, so this should not be an issue.