Skip to content

Commit 02e559c

Browse files
committed
Specifically track known observer nodes and read only RPC calls
1 parent 3656ef7 commit 02e559c

File tree

2 files changed

+97
-30
lines changed

2 files changed

+97
-30
lines changed

rust/src/hdfs/protocol.rs

+34-14
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ impl NamenodeProtocol {
5656

5757
let response = self
5858
.proxy
59-
.call("getFileInfo", message.encode_length_delimited_to_vec())
59+
.call(
60+
"getFileInfo",
61+
message.encode_length_delimited_to_vec(),
62+
false,
63+
)
6064
.await?;
6165

6266
let decoded = hdfs::GetFileInfoResponseProto::decode_length_delimited(response)?;
@@ -80,7 +84,11 @@ impl NamenodeProtocol {
8084

8185
let response = self
8286
.proxy
83-
.call("getListing", message.encode_length_delimited_to_vec())
87+
.call(
88+
"getListing",
89+
message.encode_length_delimited_to_vec(),
90+
false,
91+
)
8492
.await?;
8593

8694
let decoded = hdfs::GetListingResponseProto::decode_length_delimited(response)?;
@@ -103,6 +111,7 @@ impl NamenodeProtocol {
103111
.call(
104112
"getLocatedFileInfo",
105113
message.encode_length_delimited_to_vec(),
114+
false,
106115
)
107116
.await?;
108117

@@ -119,6 +128,7 @@ impl NamenodeProtocol {
119128
.call(
120129
"getServerDefaults",
121130
message.encode_length_delimited_to_vec(),
131+
false,
122132
)
123133
.await?;
124134

@@ -148,6 +158,7 @@ impl NamenodeProtocol {
148158
.call(
149159
"getDataEncryptionKey",
150160
message.encode_length_delimited_to_vec(),
161+
false,
151162
)
152163
.await?;
153164

@@ -212,7 +223,7 @@ impl NamenodeProtocol {
212223

213224
let response = self
214225
.proxy
215-
.call("create", message.encode_length_delimited_to_vec())
226+
.call("create", message.encode_length_delimited_to_vec(), true)
216227
.await?;
217228

218229
let decoded = hdfs::CreateResponseProto::decode_length_delimited(response)?;
@@ -240,7 +251,7 @@ impl NamenodeProtocol {
240251

241252
let response = self
242253
.proxy
243-
.call("append", message.encode_length_delimited_to_vec())
254+
.call("append", message.encode_length_delimited_to_vec(), true)
244255
.await?;
245256

246257
let decoded = hdfs::AppendResponseProto::decode_length_delimited(response)?;
@@ -266,7 +277,7 @@ impl NamenodeProtocol {
266277

267278
let response = self
268279
.proxy
269-
.call("addBlock", message.encode_length_delimited_to_vec())
280+
.call("addBlock", message.encode_length_delimited_to_vec(), true)
270281
.await?;
271282

272283
let decoded = hdfs::AddBlockResponseProto::decode_length_delimited(response)?;
@@ -290,7 +301,7 @@ impl NamenodeProtocol {
290301

291302
let response = self
292303
.proxy
293-
.call("complete", message.encode_length_delimited_to_vec())
304+
.call("complete", message.encode_length_delimited_to_vec(), true)
294305
.await?;
295306

296307
let decoded = hdfs::CompleteResponseProto::decode_length_delimited(response)?;
@@ -316,7 +327,7 @@ impl NamenodeProtocol {
316327

317328
let response = self
318329
.proxy
319-
.call("mkdirs", message.encode_length_delimited_to_vec())
330+
.call("mkdirs", message.encode_length_delimited_to_vec(), true)
320331
.await?;
321332

322333
let decoded = hdfs::MkdirsResponseProto::decode_length_delimited(response)?;
@@ -340,7 +351,7 @@ impl NamenodeProtocol {
340351

341352
let response = self
342353
.proxy
343-
.call("rename2", message.encode_length_delimited_to_vec())
354+
.call("rename2", message.encode_length_delimited_to_vec(), true)
344355
.await?;
345356

346357
let decoded = hdfs::Rename2ResponseProto::decode_length_delimited(response)?;
@@ -361,7 +372,7 @@ impl NamenodeProtocol {
361372

362373
let response = self
363374
.proxy
364-
.call("delete", message.encode_length_delimited_to_vec())
375+
.call("delete", message.encode_length_delimited_to_vec(), true)
365376
.await?;
366377

367378
let decoded = hdfs::DeleteResponseProto::decode_length_delimited(response)?;
@@ -381,7 +392,7 @@ impl NamenodeProtocol {
381392

382393
let response = self
383394
.proxy
384-
.call("renewLease", message.encode_length_delimited_to_vec())
395+
.call("renewLease", message.encode_length_delimited_to_vec(), true)
385396
.await?;
386397

387398
let decoded = hdfs::RenewLeaseResponseProto::decode_length_delimited(response)?;
@@ -404,7 +415,7 @@ impl NamenodeProtocol {
404415

405416
let response = self
406417
.proxy
407-
.call("setTimes", message.encode_length_delimited_to_vec())
418+
.call("setTimes", message.encode_length_delimited_to_vec(), true)
408419
.await?;
409420

410421
let decoded = hdfs::SetTimesResponseProto::decode_length_delimited(response)?;
@@ -428,7 +439,7 @@ impl NamenodeProtocol {
428439

429440
let response = self
430441
.proxy
431-
.call("setOwner", message.encode_length_delimited_to_vec())
442+
.call("setOwner", message.encode_length_delimited_to_vec(), true)
432443
.await?;
433444

434445
let decoded = hdfs::SetOwnerResponseProto::decode_length_delimited(response)?;
@@ -450,7 +461,11 @@ impl NamenodeProtocol {
450461

451462
let response = self
452463
.proxy
453-
.call("setPermission", message.encode_length_delimited_to_vec())
464+
.call(
465+
"setPermission",
466+
message.encode_length_delimited_to_vec(),
467+
true,
468+
)
454469
.await?;
455470

456471
let decoded = hdfs::SetPermissionResponseProto::decode_length_delimited(response)?;
@@ -472,7 +487,11 @@ impl NamenodeProtocol {
472487

473488
let response = self
474489
.proxy
475-
.call("setReplication", message.encode_length_delimited_to_vec())
490+
.call(
491+
"setReplication",
492+
message.encode_length_delimited_to_vec(),
493+
true,
494+
)
476495
.await?;
477496

478497
let decoded = hdfs::SetReplicationResponseProto::decode_length_delimited(response)?;
@@ -495,6 +514,7 @@ impl NamenodeProtocol {
495514
.call(
496515
"getContentSummary",
497516
message.encode_length_delimited_to_vec(),
517+
false,
498518
)
499519
.await?;
500520

rust/src/hdfs/proxy.rs

+63-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::sync::{
2-
atomic::{AtomicBool, AtomicUsize, Ordering},
3-
Arc, Mutex,
1+
use std::{
2+
collections::HashSet,
3+
sync::{
4+
atomic::{AtomicBool, AtomicUsize, Ordering},
5+
Arc, Mutex,
6+
},
47
};
58

69
use bytes::Bytes;
@@ -73,7 +76,8 @@ impl ProxyConnection {
7376
#[derive(Debug)]
7477
pub(crate) struct NameServiceProxy {
7578
proxy_connections: Vec<ProxyConnection>,
76-
current_index: AtomicUsize,
79+
current_active: AtomicUsize,
80+
current_observers: Arc<Mutex<HashSet<usize>>>,
7781
msycned: AtomicBool,
7882
}
7983

@@ -101,15 +105,16 @@ impl NameServiceProxy {
101105

102106
Ok(NameServiceProxy {
103107
proxy_connections,
104-
current_index: AtomicUsize::new(0),
108+
current_active: AtomicUsize::new(0),
109+
current_observers: Arc::new(Mutex::new(HashSet::new())),
105110
msycned: AtomicBool::new(false),
106111
})
107112
}
108113

109-
async fn msync_if_needed(&self) -> Result<()> {
110-
if !self.msycned.fetch_or(true, Ordering::SeqCst) {
114+
async fn msync_if_needed(&self, write: bool) -> Result<()> {
115+
if !self.msycned.fetch_or(true, Ordering::SeqCst) && !write {
111116
let msync_msg = hdfs::MsyncRequestProto::default();
112-
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec())
117+
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec(), false)
113118
.await
114119
.map(|_| ())
115120
.or_else(|err| match err {
@@ -125,26 +130,60 @@ impl NameServiceProxy {
125130
Ok(())
126131
}
127132

128-
pub(crate) async fn call(&self, method_name: &'static str, message: Vec<u8>) -> Result<Bytes> {
129-
self.msync_if_needed().await?;
130-
self.call_inner(method_name, message).await
133+
pub(crate) async fn call(
134+
&self,
135+
method_name: &'static str,
136+
message: Vec<u8>,
137+
write: bool,
138+
) -> Result<Bytes> {
139+
self.msync_if_needed(write).await?;
140+
self.call_inner(method_name, message, write).await
131141
}
132142

133143
fn is_retriable(exception: &str) -> bool {
134144
exception == STANDBY_EXCEPTION || exception == OBSERVER_RETRY_EXCEPTION
135145
}
136146

137-
async fn call_inner(&self, method_name: &'static str, message: Vec<u8>) -> Result<Bytes> {
138-
let mut proxy_index = self.current_index.load(Ordering::SeqCst);
147+
async fn call_inner(
148+
&self,
149+
method_name: &'static str,
150+
message: Vec<u8>,
151+
write: bool,
152+
) -> Result<Bytes> {
153+
let current_active = self.current_active.load(Ordering::SeqCst);
154+
let proxy_indices = if write {
155+
// If we're writing, try the current known active and then loop
156+
// through the rest if that fails
157+
let first = current_active;
158+
let rest = (0..self.proxy_connections.len())
159+
.filter(|i| *i != first)
160+
.collect::<Vec<usize>>();
161+
[vec![first], rest].concat()
162+
} else {
163+
// If we're reading, try all known observers, then the active, then
164+
// any remaining
165+
let mut first = self.current_observers.lock().unwrap().clone();
166+
if !first.contains(&current_active) {
167+
first.insert(current_active);
168+
}
169+
let rest = (0..self.proxy_connections.len()).filter(|i| !first.contains(i));
170+
first.iter().copied().chain(rest).collect()
171+
};
172+
139173
let mut attempts = 0;
140174
loop {
175+
let proxy_index = proxy_indices[attempts];
141176
let result = self.proxy_connections[proxy_index]
142177
.call(method_name, &message)
143178
.await;
144179

145180
match result {
146181
Ok(bytes) => {
147-
self.current_index.store(proxy_index, Ordering::SeqCst);
182+
if write {
183+
self.current_active.store(proxy_index, Ordering::SeqCst);
184+
} else {
185+
self.current_observers.lock().unwrap().insert(proxy_index);
186+
}
148187
return Ok(bytes);
149188
}
150189
// RPCError indicates the call was successfully attempted but had an error, so should be returned immediately
@@ -153,13 +192,21 @@ impl NameServiceProxy {
153192
}
154193
Err(_) if attempts >= self.proxy_connections.len() - 1 => return result,
155194
// Retriable error, do nothing and try the next connection
156-
Err(HdfsError::RPCError(_, _)) => (),
195+
Err(HdfsError::RPCError(exception, _)) => match exception.as_ref() {
196+
OBSERVER_RETRY_EXCEPTION => {
197+
self.current_observers.lock().unwrap().insert(proxy_index);
198+
}
199+
STANDBY_EXCEPTION => {
200+
self.current_observers.lock().unwrap().remove(&proxy_index);
201+
}
202+
_ => (),
203+
},
157204
Err(e) => {
205+
// Some other error, we will retry but log the error
158206
warn!("{:?}", e);
159207
}
160208
}
161209

162-
proxy_index = (proxy_index + 1) % self.proxy_connections.len();
163210
attempts += 1;
164211
}
165212
}

0 commit comments

Comments
 (0)