Skip to content

Commit 46a5dc4

Browse files
authored
Better observer namenode support (#149)
1 parent 32510e9 commit 46a5dc4

File tree

4 files changed

+109
-37
lines changed

4 files changed

+109
-37
lines changed

rust/minidfs/src/main/java/main/Main.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public static void main(String args[]) throws Exception {
136136
dfs.transitionToActive(2);
137137
} else if (flags.contains("ha")) {
138138
activeNamenode = 2;
139-
// dfs.transitionToObserver(1);
139+
dfs.transitionToObserver(1);
140140
dfs.transitionToActive(activeNamenode);
141141
}
142142

rust/src/hdfs/protocol.rs

+34-14
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ impl NamenodeProtocol {
5656

5757
let response = self
5858
.proxy
59-
.call("getFileInfo", message.encode_length_delimited_to_vec())
59+
.call(
60+
"getFileInfo",
61+
message.encode_length_delimited_to_vec(),
62+
false,
63+
)
6064
.await?;
6165

6266
let decoded = hdfs::GetFileInfoResponseProto::decode_length_delimited(response)?;
@@ -80,7 +84,11 @@ impl NamenodeProtocol {
8084

8185
let response = self
8286
.proxy
83-
.call("getListing", message.encode_length_delimited_to_vec())
87+
.call(
88+
"getListing",
89+
message.encode_length_delimited_to_vec(),
90+
false,
91+
)
8492
.await?;
8593

8694
let decoded = hdfs::GetListingResponseProto::decode_length_delimited(response)?;
@@ -103,6 +111,7 @@ impl NamenodeProtocol {
103111
.call(
104112
"getLocatedFileInfo",
105113
message.encode_length_delimited_to_vec(),
114+
false,
106115
)
107116
.await?;
108117

@@ -119,6 +128,7 @@ impl NamenodeProtocol {
119128
.call(
120129
"getServerDefaults",
121130
message.encode_length_delimited_to_vec(),
131+
false,
122132
)
123133
.await?;
124134

@@ -148,6 +158,7 @@ impl NamenodeProtocol {
148158
.call(
149159
"getDataEncryptionKey",
150160
message.encode_length_delimited_to_vec(),
161+
false,
151162
)
152163
.await?;
153164

@@ -212,7 +223,7 @@ impl NamenodeProtocol {
212223

213224
let response = self
214225
.proxy
215-
.call("create", message.encode_length_delimited_to_vec())
226+
.call("create", message.encode_length_delimited_to_vec(), true)
216227
.await?;
217228

218229
let decoded = hdfs::CreateResponseProto::decode_length_delimited(response)?;
@@ -240,7 +251,7 @@ impl NamenodeProtocol {
240251

241252
let response = self
242253
.proxy
243-
.call("append", message.encode_length_delimited_to_vec())
254+
.call("append", message.encode_length_delimited_to_vec(), true)
244255
.await?;
245256

246257
let decoded = hdfs::AppendResponseProto::decode_length_delimited(response)?;
@@ -266,7 +277,7 @@ impl NamenodeProtocol {
266277

267278
let response = self
268279
.proxy
269-
.call("addBlock", message.encode_length_delimited_to_vec())
280+
.call("addBlock", message.encode_length_delimited_to_vec(), true)
270281
.await?;
271282

272283
let decoded = hdfs::AddBlockResponseProto::decode_length_delimited(response)?;
@@ -290,7 +301,7 @@ impl NamenodeProtocol {
290301

291302
let response = self
292303
.proxy
293-
.call("complete", message.encode_length_delimited_to_vec())
304+
.call("complete", message.encode_length_delimited_to_vec(), true)
294305
.await?;
295306

296307
let decoded = hdfs::CompleteResponseProto::decode_length_delimited(response)?;
@@ -316,7 +327,7 @@ impl NamenodeProtocol {
316327

317328
let response = self
318329
.proxy
319-
.call("mkdirs", message.encode_length_delimited_to_vec())
330+
.call("mkdirs", message.encode_length_delimited_to_vec(), true)
320331
.await?;
321332

322333
let decoded = hdfs::MkdirsResponseProto::decode_length_delimited(response)?;
@@ -340,7 +351,7 @@ impl NamenodeProtocol {
340351

341352
let response = self
342353
.proxy
343-
.call("rename2", message.encode_length_delimited_to_vec())
354+
.call("rename2", message.encode_length_delimited_to_vec(), true)
344355
.await?;
345356

346357
let decoded = hdfs::Rename2ResponseProto::decode_length_delimited(response)?;
@@ -361,7 +372,7 @@ impl NamenodeProtocol {
361372

362373
let response = self
363374
.proxy
364-
.call("delete", message.encode_length_delimited_to_vec())
375+
.call("delete", message.encode_length_delimited_to_vec(), true)
365376
.await?;
366377

367378
let decoded = hdfs::DeleteResponseProto::decode_length_delimited(response)?;
@@ -381,7 +392,7 @@ impl NamenodeProtocol {
381392

382393
let response = self
383394
.proxy
384-
.call("renewLease", message.encode_length_delimited_to_vec())
395+
.call("renewLease", message.encode_length_delimited_to_vec(), true)
385396
.await?;
386397

387398
let decoded = hdfs::RenewLeaseResponseProto::decode_length_delimited(response)?;
@@ -404,7 +415,7 @@ impl NamenodeProtocol {
404415

405416
let response = self
406417
.proxy
407-
.call("setTimes", message.encode_length_delimited_to_vec())
418+
.call("setTimes", message.encode_length_delimited_to_vec(), true)
408419
.await?;
409420

410421
let decoded = hdfs::SetTimesResponseProto::decode_length_delimited(response)?;
@@ -428,7 +439,7 @@ impl NamenodeProtocol {
428439

429440
let response = self
430441
.proxy
431-
.call("setOwner", message.encode_length_delimited_to_vec())
442+
.call("setOwner", message.encode_length_delimited_to_vec(), true)
432443
.await?;
433444

434445
let decoded = hdfs::SetOwnerResponseProto::decode_length_delimited(response)?;
@@ -450,7 +461,11 @@ impl NamenodeProtocol {
450461

451462
let response = self
452463
.proxy
453-
.call("setPermission", message.encode_length_delimited_to_vec())
464+
.call(
465+
"setPermission",
466+
message.encode_length_delimited_to_vec(),
467+
true,
468+
)
454469
.await?;
455470

456471
let decoded = hdfs::SetPermissionResponseProto::decode_length_delimited(response)?;
@@ -472,7 +487,11 @@ impl NamenodeProtocol {
472487

473488
let response = self
474489
.proxy
475-
.call("setReplication", message.encode_length_delimited_to_vec())
490+
.call(
491+
"setReplication",
492+
message.encode_length_delimited_to_vec(),
493+
true,
494+
)
476495
.await?;
477496

478497
let decoded = hdfs::SetReplicationResponseProto::decode_length_delimited(response)?;
@@ -495,6 +514,7 @@ impl NamenodeProtocol {
495514
.call(
496515
"getContentSummary",
497516
message.encode_length_delimited_to_vec(),
517+
false,
498518
)
499519
.await?;
500520

rust/src/hdfs/proxy.rs

+73-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::sync::{
2-
atomic::{AtomicBool, AtomicUsize, Ordering},
3-
Arc, Mutex,
1+
use std::{
2+
collections::HashSet,
3+
sync::{
4+
atomic::{AtomicBool, AtomicUsize, Ordering},
5+
Arc, Mutex,
6+
},
47
};
58

69
use bytes::Bytes;
@@ -73,7 +76,8 @@ impl ProxyConnection {
7376
#[derive(Debug)]
7477
pub(crate) struct NameServiceProxy {
7578
proxy_connections: Vec<ProxyConnection>,
76-
current_index: AtomicUsize,
79+
current_active: AtomicUsize,
80+
current_observers: Arc<Mutex<HashSet<usize>>>,
7781
msycned: AtomicBool,
7882
}
7983

@@ -101,15 +105,16 @@ impl NameServiceProxy {
101105

102106
Ok(NameServiceProxy {
103107
proxy_connections,
104-
current_index: AtomicUsize::new(0),
108+
current_active: AtomicUsize::new(0),
109+
current_observers: Arc::new(Mutex::new(HashSet::new())),
105110
msycned: AtomicBool::new(false),
106111
})
107112
}
108113

109-
async fn msync_if_needed(&self) -> Result<()> {
110-
if !self.msycned.fetch_or(true, Ordering::SeqCst) {
114+
async fn msync_if_needed(&self, write: bool) -> Result<()> {
115+
if !self.msycned.fetch_or(true, Ordering::SeqCst) && !write {
111116
let msync_msg = hdfs::MsyncRequestProto::default();
112-
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec())
117+
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec(), true)
113118
.await
114119
.map(|_| ())
115120
.or_else(|err| match err {
@@ -125,26 +130,64 @@ impl NameServiceProxy {
125130
Ok(())
126131
}
127132

128-
pub(crate) async fn call(&self, method_name: &'static str, message: Vec<u8>) -> Result<Bytes> {
129-
self.msync_if_needed().await?;
130-
self.call_inner(method_name, message).await
133+
pub(crate) async fn call(
134+
&self,
135+
method_name: &'static str,
136+
message: Vec<u8>,
137+
write: bool,
138+
) -> Result<Bytes> {
139+
self.msync_if_needed(write).await?;
140+
self.call_inner(method_name, message, write).await
131141
}
132142

133143
fn is_retriable(exception: &str) -> bool {
134144
exception == STANDBY_EXCEPTION || exception == OBSERVER_RETRY_EXCEPTION
135145
}
136146

137-
async fn call_inner(&self, method_name: &'static str, message: Vec<u8>) -> Result<Bytes> {
138-
let mut proxy_index = self.current_index.load(Ordering::SeqCst);
147+
async fn call_inner(
148+
&self,
149+
method_name: &'static str,
150+
message: Vec<u8>,
151+
write: bool,
152+
) -> Result<Bytes> {
153+
let current_active = self.current_active.load(Ordering::SeqCst);
154+
let proxy_indices = if write {
155+
// If we're writing, try the current known active and then loop
156+
// through the rest if that fails
157+
let first = current_active;
158+
let rest = (0..self.proxy_connections.len()).filter(|i| *i != first);
159+
[first].into_iter().chain(rest).collect::<Vec<_>>()
160+
} else {
161+
// If we're reading, try all known observers, then the active, then
162+
// any remaining
163+
let mut first = self
164+
.current_observers
165+
.lock()
166+
.unwrap()
167+
.iter()
168+
.copied()
169+
.collect::<Vec<_>>();
170+
if !first.contains(&current_active) {
171+
first.push(current_active);
172+
}
173+
let rest = (0..self.proxy_connections.len()).filter(|i| !first.contains(i));
174+
first.iter().copied().chain(rest).collect()
175+
};
176+
139177
let mut attempts = 0;
140178
loop {
179+
let proxy_index = proxy_indices[attempts];
141180
let result = self.proxy_connections[proxy_index]
142181
.call(method_name, &message)
143182
.await;
144183

145184
match result {
146185
Ok(bytes) => {
147-
self.current_index.store(proxy_index, Ordering::SeqCst);
186+
if write {
187+
self.current_active.store(proxy_index, Ordering::SeqCst);
188+
} else {
189+
self.current_observers.lock().unwrap().insert(proxy_index);
190+
}
148191
return Ok(bytes);
149192
}
150193
// RPCError indicates the call was successfully attempted but had an error, so should be returned immediately
@@ -153,13 +196,27 @@ impl NameServiceProxy {
153196
}
154197
Err(_) if attempts >= self.proxy_connections.len() - 1 => return result,
155198
// Retriable error, do nothing and try the next connection
156-
Err(HdfsError::RPCError(_, _)) => (),
199+
Err(HdfsError::RPCError(exception, _))
200+
| Err(HdfsError::FatalRPCError(exception, _))
201+
if Self::is_retriable(&exception) =>
202+
{
203+
match exception.as_ref() {
204+
OBSERVER_RETRY_EXCEPTION => {
205+
self.current_observers.lock().unwrap().insert(proxy_index);
206+
}
207+
STANDBY_EXCEPTION => {
208+
self.current_observers.lock().unwrap().remove(&proxy_index);
209+
}
210+
_ => (),
211+
}
212+
}
157213
Err(e) => {
214+
// Some other error, we will retry but log the error
215+
self.current_observers.lock().unwrap().remove(&proxy_index);
158216
warn!("{:?}", e);
159217
}
160218
}
161219

162-
proxy_index = (proxy_index + 1) % self.proxy_connections.len();
163220
attempts += 1;
164221
}
165222
}

rust/src/security/sasl.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use bytes::{Buf, BufMut, Bytes, BytesMut};
22
use cipher::{KeyIvInit, StreamCipher};
3-
use log::{debug, warn};
3+
use log::debug;
44
use prost::Message;
55
use std::io;
66
use std::sync::{Arc, Mutex};
@@ -262,11 +262,6 @@ impl SaslReader {
262262
));
263263
}
264264
RpcStatusProto::Fatal => {
265-
warn!(
266-
"RPC fatal error: {}: {}",
267-
rpc_response.exception_class_name(),
268-
rpc_response.error_msg()
269-
);
270265
return Err(HdfsError::FatalRPCError(
271266
rpc_response.exception_class_name().to_string(),
272267
rpc_response.error_msg().to_string(),

0 commit comments

Comments
 (0)