diff --git a/Cargo.lock b/Cargo.lock index 9385690..3d53ee1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,6 +302,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -314,6 +315,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -448,6 +450,20 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs-hdfs3" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f38e500596a428817fd4fd8a9a21da32f4edb3250e87886039670b12ea02f5d" +dependencies = [ + "bindgen", + "cc", + "lazy_static", + "libc", + "log", + "url", +] + [[package]] name = "futures" version = "0.3.30" @@ -634,6 +650,7 @@ dependencies = [ "crc", "criterion", "env_logger", + "fs-hdfs3", "futures", "g2p", "gsasl-sys", diff --git a/README.md b/README.md index 73d656d..1c2e8ff 100644 --- a/README.md +++ b/README.md @@ -71,4 +71,17 @@ cargo test -p hdfs-native --features token,kerberos,intergation-test ``` ### Python tests -See the [Python README](./python/README.md) \ No newline at end of file +See the [Python README](./python/README.md) + +## Running benchmarks +Some of the benchmarks compare performance to the JVM based client through libhdfs via the fs-hdfs3 crate. Because of that, some extra setup is required to run the benchmarks: + +```bash +export HADOOP_CONF_DIR=$(pwd)/crates/hdfs-native/target/test +export CLASSPATH=$(hadoop classpath) +``` + +then you can run the benchmarks with +```bash +cargo bench -p hdfs-native --features benchmark,integration-test +``` \ No newline at end of file diff --git a/crates/hdfs-native/Cargo.toml b/crates/hdfs-native/Cargo.toml index fbf2b85..f13240c 100644 --- a/crates/hdfs-native/Cargo.toml +++ b/crates/hdfs-native/Cargo.toml @@ -26,7 +26,7 @@ prost-types = "0.12" roxmltree = "0.18" socket2 = "0.5" thiserror = "1" -tokio = { workspace = true, features = ["rt", "net", "io-util", "macros", "sync", "time"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net", "io-util", "macros", "sync", "time"] } url = "2" users = { version = "0.11", default-features = false } uuid = { version = "1", features = ["v4"] } @@ -37,8 +37,9 @@ prost-build = { version = "0.12", optional = true } protobuf-src = { version = "1.1", optional = true } [dev-dependencies] -criterion = "0.5" +criterion = { version = "0.5", features = ["async_tokio", "async_futures"] } env_logger = "0.10" +fs-hdfs3 = "0.1.12" serial_test = "2.0.0" tempfile = "3" which = "4" @@ -49,8 +50,16 @@ token = ["gsasl-sys"] generate-protobuf = ["prost-build", "protobuf-src"] integration-test = ["which"] -benchmark = [] +benchmark = ["which"] [[bench]] name = "ec" harness = false + +[[bench]] +name = "io" +harness = false + +[[bench]] +name = "rpc" +harness = false \ No newline at end of file diff --git a/crates/hdfs-native/benches/io.rs b/crates/hdfs-native/benches/io.rs new file mode 100644 index 0000000..16094a4 --- /dev/null +++ b/crates/hdfs-native/benches/io.rs @@ -0,0 +1,110 @@ +use std::collections::HashSet; + +use bytes::{Buf, BufMut, BytesMut}; +use criterion::*; +use hdfs::hdfs::get_hdfs; +use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions}; + +async fn write_file(client: &Client, ints: usize) { + let mut writer = client + .create("/bench", WriteOptions::default()) + .await + .unwrap(); + + let mut data = BytesMut::with_capacity(ints * 4); + for i in 0..ints { + data.put_u32(i as u32); + } + writer.write(data.freeze()).await.unwrap(); + writer.close().await.unwrap(); +} + +fn bench(c: &mut Criterion) { + let _ = env_logger::builder().is_test(true).try_init(); + + let _dfs = MiniDfs::with_features(&HashSet::new()); + let client = Client::default(); + + let ints_to_write: usize = 128 * 1024 * 1024; // 128 MiB file + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { write_file(&client, ints_to_write).await }); + + let fs = get_hdfs().unwrap(); + + let mut group = c.benchmark_group("read"); + group.throughput(Throughput::Bytes((ints_to_write * 4) as u64)); + group.sample_size(10); + + let reader = rt.block_on(client.read("/bench")).unwrap(); + group.bench_function("read-native", |b| { + b.to_async(&rt).iter(|| async { + // let reader = client.read("/bench").await.unwrap(); + + reader.read_range(0, reader.file_length()).await.unwrap() + }) + }); + group.sample_size(10); + group.bench_function("read-libhdfs", |b| { + b.iter(|| { + let mut buf = BytesMut::zeroed(ints_to_write * 4); + let mut bytes_read = 0; + let fs = get_hdfs().unwrap(); + let reader = fs.open("/bench").unwrap(); + + while bytes_read < ints_to_write * 4 { + bytes_read += reader + .read(&mut buf[bytes_read..ints_to_write * 4]) + .unwrap() as usize; + } + reader.close().unwrap(); + buf + }) + }); + + let mut data_to_write = BytesMut::with_capacity(ints_to_write * 4); + for i in 0..ints_to_write { + data_to_write.put_i32(i as i32); + } + + let buf = data_to_write.freeze(); + + drop(group); + + let mut group = c.benchmark_group("write"); + group.throughput(Throughput::Bytes((ints_to_write * 4) as u64)); + group.sample_size(10); + + group.bench_function("write-native", |b| { + b.to_async(&rt).iter(|| async { + let mut writer = client + .create("/bench-write", WriteOptions::default().overwrite(true)) + .await + .unwrap(); + + writer.write(buf.clone()).await.unwrap(); + writer.close().await.unwrap(); + }) + }); + + group.sample_size(10); + group.bench_function("write-libhdfs", |b| { + b.iter(|| { + let mut buf = buf.clone(); + let writer = fs.create_with_overwrite("/bench-write", true).unwrap(); + + while buf.remaining() > 0 { + let written = writer.write(&buf[..]).unwrap(); + buf.advance(written as usize); + } + writer.close().unwrap(); + }) + }); +} + +criterion_group!(benches, bench); +criterion_main!(benches); diff --git a/crates/hdfs-native/benches/rpc.rs b/crates/hdfs-native/benches/rpc.rs new file mode 100644 index 0000000..0b7cd29 --- /dev/null +++ b/crates/hdfs-native/benches/rpc.rs @@ -0,0 +1,41 @@ +use std::collections::HashSet; + +use criterion::*; +use hdfs::hdfs::get_hdfs; +use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions}; + +fn bench(c: &mut Criterion) { + let _ = env_logger::builder().is_test(true).try_init(); + + let _dfs = MiniDfs::with_features(&HashSet::new()); + let client = Client::default(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + client + .create("/bench", WriteOptions::default()) + .await + .unwrap() + .close() + .await + .unwrap(); + }); + + let fs = get_hdfs().unwrap(); + + let mut group = c.benchmark_group("rpc"); + group.bench_function("getFileInfo-native", |b| { + b.to_async(&rt) + .iter(|| async { client.get_file_info("/bench").await.unwrap() }) + }); + group.bench_function("getFileInfo-libhdfs", |b| { + b.iter(|| fs.get_file_status("/bench").unwrap()) + }); +} + +criterion_group!(benches, bench); +criterion_main!(benches); diff --git a/crates/hdfs-native/src/hdfs/block_reader.rs b/crates/hdfs-native/src/hdfs/block_reader.rs index 1f09996..8f2729d 100644 --- a/crates/hdfs-native/src/hdfs/block_reader.rs +++ b/crates/hdfs-native/src/hdfs/block_reader.rs @@ -93,13 +93,16 @@ impl ReplicatedBlockStream { } async fn next_packet(&mut self) -> Result> { - if self.len == 0 { - return Ok(None); - } if self.connection.is_none() { self.select_next_datanode().await?; } let conn = self.connection.as_mut().unwrap(); + + if self.len == 0 { + conn.send_read_success().await?; + return Ok(None); + } + let packet = conn.read_packet().await?; let packet_offset = if self.offset > packet.header.offset_in_block as usize { diff --git a/crates/hdfs-native/src/hdfs/connection.rs b/crates/hdfs-native/src/hdfs/connection.rs index 3879fb6..7072010 100644 --- a/crates/hdfs-native/src/hdfs/connection.rs +++ b/crates/hdfs-native/src/hdfs/connection.rs @@ -587,6 +587,20 @@ impl DatanodeConnection { Ok(Packet::new(header, checksum, data)) } + pub(crate) async fn send_read_success(&mut self) -> Result<()> { + let client_read_status = hdfs::ClientReadStatusProto { + status: hdfs::Status::ChecksumOk as i32, + }; + + self.stream + .write_all(&client_read_status.encode_length_delimited_to_vec()) + .await?; + self.stream.flush().await?; + self.stream.shutdown().await?; + + Ok(()) + } + pub(crate) fn split(self) -> (DatanodeReader, DatanodeWriter) { let (reader, writer) = self.stream.into_inner().into_split(); let reader = DatanodeReader { diff --git a/crates/hdfs-native/src/lib.rs b/crates/hdfs-native/src/lib.rs index 74c62d2..e447943 100644 --- a/crates/hdfs-native/src/lib.rs +++ b/crates/hdfs-native/src/lib.rs @@ -37,7 +37,7 @@ pub(crate) mod ec; pub(crate) mod error; pub mod file; pub(crate) mod hdfs; -#[cfg(feature = "integration-test")] +#[cfg(any(feature = "integration-test", feature = "benchmark"))] pub mod minidfs; pub(crate) mod proto; pub(crate) mod security; diff --git a/crates/hdfs-native/src/minidfs.rs b/crates/hdfs-native/src/minidfs.rs index 2a4decd..8bc0389 100644 --- a/crates/hdfs-native/src/minidfs.rs +++ b/crates/hdfs-native/src/minidfs.rs @@ -131,7 +131,6 @@ impl MiniDfs { }; env::set_var("HADOOP_CONF_DIR", "target/test"); - MiniDfs { process: child, url: url.to_string(),