Skip to content

Commit 32510e9

Browse files
authored
Loosen locking around RPC calls for better multi-threading (#148)
* Loosen locking around RPC calls for better multi-threading * Add benchmark for parallel RPC
1 parent c4fedda commit 32510e9

File tree

3 files changed

+52
-35
lines changed

3 files changed

+52
-35
lines changed

rust/benches/rpc.rs

+18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::collections::HashSet;
22

33
use criterion::*;
4+
use futures::future::join_all;
45
use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions};
56

67
fn bench(c: &mut Criterion) {
@@ -34,6 +35,23 @@ fn bench(c: &mut Criterion) {
3435
let fs = hdfs::hdfs::get_hdfs().unwrap();
3536
b.iter(|| fs.get_file_status("/bench").unwrap())
3637
});
38+
39+
group.sampling_mode(SamplingMode::Flat);
40+
group.bench_function("getFileInfo-parallel", |b| {
41+
b.to_async(&rt).iter_batched(
42+
|| {
43+
(0..100)
44+
.map(|_| client.get_file_info("/bench"))
45+
.collect::<Vec<_>>()
46+
},
47+
|futures| async {
48+
for result in join_all(futures).await {
49+
result.unwrap();
50+
}
51+
},
52+
BatchSize::SmallInput,
53+
)
54+
});
3755
}
3856

3957
criterion_group!(benches, bench);

rust/src/hdfs/connection.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,11 @@ impl RpcConnection {
260260
Ok(())
261261
}
262262

263-
pub(crate) async fn call(&self, method_name: &str, message: &[u8]) -> Result<Bytes> {
263+
pub(crate) async fn call(
264+
&self,
265+
method_name: &str,
266+
message: &[u8],
267+
) -> Result<oneshot::Receiver<Result<Bytes>>> {
264268
let call_id = self.get_next_call_id();
265269
let conn_header = self.get_connection_header(call_id, 0);
266270

@@ -284,7 +288,7 @@ impl RpcConnection {
284288
self.write_messages(&[&conn_header_buf, &header_buf, message])
285289
.await?;
286290

287-
receiver.await.unwrap()
291+
Ok(receiver)
288292
}
289293
}
290294

rust/src/hdfs/proxy.rs

+28-33
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const OBSERVER_RETRY_EXCEPTION: &str = "org.apache.hadoop.ipc.ObserverRetryOnAct
2424
#[derive(Debug)]
2525
struct ProxyConnection {
2626
url: String,
27-
inner: Option<RpcConnection>,
27+
inner: Arc<tokio::sync::Mutex<Option<RpcConnection>>>,
2828
alignment_context: Arc<Mutex<AlignmentContext>>,
2929
nameservice: Option<String>,
3030
}
@@ -37,37 +37,42 @@ impl ProxyConnection {
3737
) -> Self {
3838
ProxyConnection {
3939
url,
40-
inner: None,
40+
inner: Arc::new(tokio::sync::Mutex::new(None)),
4141
alignment_context,
4242
nameservice,
4343
}
4444
}
4545

46-
async fn get_connection(&mut self) -> Result<&RpcConnection> {
47-
if self.inner.is_none() || !self.inner.as_ref().unwrap().is_alive() {
48-
self.inner = Some(
49-
RpcConnection::connect(
50-
&self.url,
51-
self.alignment_context.clone(),
52-
self.nameservice.as_deref(),
53-
)
54-
.await?,
55-
);
56-
}
57-
Ok(self.inner.as_ref().unwrap())
58-
}
46+
async fn call(&self, method_name: &str, message: &[u8]) -> Result<Bytes> {
47+
let receiver = {
48+
let mut connection = self.inner.lock().await;
49+
match &mut *connection {
50+
Some(c) if c.is_alive() => (),
51+
c => {
52+
*c = Some(
53+
RpcConnection::connect(
54+
&self.url,
55+
self.alignment_context.clone(),
56+
self.nameservice.as_deref(),
57+
)
58+
.await?,
59+
);
60+
}
61+
}
5962

60-
async fn call(&mut self, method_name: &str, message: &[u8]) -> Result<Bytes> {
61-
self.get_connection()
62-
.await?
63-
.call(method_name, message)
64-
.await
63+
connection
64+
.as_ref()
65+
.unwrap()
66+
.call(method_name, message)
67+
.await?
68+
};
69+
receiver.await.unwrap()
6570
}
6671
}
6772

6873
#[derive(Debug)]
6974
pub(crate) struct NameServiceProxy {
70-
proxy_connections: Vec<Arc<tokio::sync::Mutex<ProxyConnection>>>,
75+
proxy_connections: Vec<ProxyConnection>,
7176
current_index: AtomicUsize,
7277
msycned: AtomicBool,
7378
}
@@ -80,22 +85,14 @@ impl NameServiceProxy {
8085

8186
let proxy_connections = if let Some(port) = nameservice.port() {
8287
let url = format!("{}:{}", nameservice.host_str().unwrap(), port);
83-
vec![Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
84-
url,
85-
alignment_context.clone(),
86-
None,
87-
)))]
88+
vec![ProxyConnection::new(url, alignment_context.clone(), None)]
8889
} else if let Some(host) = nameservice.host_str() {
8990
// TODO: Add check for no configured namenodes
9091
config
9192
.get_urls_for_nameservice(host)?
9293
.into_iter()
9394
.map(|url| {
94-
Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
95-
url,
96-
alignment_context.clone(),
97-
Some(host.to_string()),
98-
)))
95+
ProxyConnection::new(url, alignment_context.clone(), Some(host.to_string()))
9996
})
10097
.collect()
10198
} else {
@@ -142,8 +139,6 @@ impl NameServiceProxy {
142139
let mut attempts = 0;
143140
loop {
144141
let result = self.proxy_connections[proxy_index]
145-
.lock()
146-
.await
147142
.call(method_name, &message)
148143
.await;
149144

0 commit comments

Comments
 (0)