Skip to content

Commit 33a2249

Browse files
author
zhaohaiyuan
committed
support replace node policy
1 parent 441d56c commit 33a2249

File tree

4 files changed

+155
-4
lines changed

4 files changed

+155
-4
lines changed

rust/src/hdfs/block_writer.rs

+48-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
hdfs::{
1111
connection::{DatanodeConnection, DatanodeReader, DatanodeWriter, Op, WritePacket},
1212
protocol::NamenodeProtocol,
13+
replace_datanode::ReplaceDatanodeOnFailure,
1314
},
1415
proto::hdfs,
1516
HdfsError, Result,
@@ -300,6 +301,7 @@ pub(crate) struct ReplicatedBlockWriter {
300301

301302
current_packet: WritePacket,
302303
pipeline: Option<Pipeline>,
304+
replace_datanode: ReplaceDatanodeOnFailure,
303305
}
304306

305307
impl ReplicatedBlockWriter {
@@ -340,6 +342,7 @@ impl ReplicatedBlockWriter {
340342
current_packet,
341343

342344
pipeline: Some(pipeline),
345+
replace_datanode: ReplaceDatanodeOnFailure::default(),
343346
};
344347

345348
Ok(this)
@@ -363,10 +366,51 @@ impl ReplicatedBlockWriter {
363366
debug!("Recovering block writer");
364367

365368
let mut new_block = self.block.clone();
366-
for failed_node in failed_nodes {
367-
new_block.locs.remove(failed_node);
368-
new_block.storage_i_ds.remove(failed_node);
369-
new_block.storage_types.remove(failed_node);
369+
370+
// Check if we should try to replace failed datanodes
371+
let should_replace = self.replace_datanode.should_replace(
372+
new_block.locs.len() as u32,
373+
&new_block.locs,
374+
new_block.b.num_bytes() > 0,
375+
false,
376+
);
377+
378+
if should_replace && self.replace_datanode.check_enabled() {
379+
// Request additional datanodes to replace failed ones
380+
let additional_nodes = self
381+
.protocol
382+
.get_additional_datanodes(&new_block.b, &new_block.locs, failed_nodes.len() as u32)
383+
.await;
384+
385+
match additional_nodes {
386+
Ok(new_nodes) => {
387+
// Replace failed nodes with new ones
388+
for (failed_idx, new_node) in failed_nodes.iter().zip(new_nodes.into_iter()) {
389+
if *failed_idx < new_block.locs.len() {
390+
new_block.locs[*failed_idx] = new_node;
391+
}
392+
}
393+
}
394+
Err(e) => {
395+
if !self.replace_datanode.is_best_effort() {
396+
return Err(e);
397+
}
398+
// In best effort mode, continue with remaining nodes
399+
warn!("Failed to get replacement nodes: {}", e);
400+
for failed_node in failed_nodes {
401+
new_block.locs.remove(failed_node);
402+
new_block.storage_i_ds.remove(failed_node);
403+
new_block.storage_types.remove(failed_node);
404+
}
405+
}
406+
}
407+
} else {
408+
// Original behavior - remove failed nodes
409+
for failed_node in failed_nodes {
410+
new_block.locs.remove(failed_node);
411+
new_block.storage_i_ds.remove(failed_node);
412+
new_block.storage_types.remove(failed_node);
413+
}
370414
}
371415

372416
let mut bytes_acked = new_block.b.num_bytes();

rust/src/hdfs/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ pub(crate) mod block_writer;
33
pub(crate) mod connection;
44
pub(crate) mod protocol;
55
pub(crate) mod proxy;
6+
pub(crate) mod replace_datanode;

rust/src/hdfs/protocol.rs

+23
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,29 @@ impl NamenodeProtocol {
444444
};
445445
self.call("getAclStatus", message, false).await
446446
}
447+
448+
pub(crate) async fn get_additional_datanodes(
449+
&self,
450+
block: &hdfs::ExtendedBlockProto,
451+
existing_nodes: &[hdfs::DatanodeInfoProto],
452+
num_additional_nodes: u32,
453+
) -> Result<Vec<hdfs::DatanodeInfoProto>> {
454+
let message = hdfs::GetAdditionalDatanodeRequestProto {
455+
src: "".to_string(), // Not used by the namenode
456+
blk: block.clone(),
457+
existings: existing_nodes.to_vec(),
458+
excludes: vec![], // No excluded nodes
459+
num_additional_nodes,
460+
client_name: self.client_name.clone(),
461+
existing_storage_uuids: vec![], // Not used in this context
462+
..Default::default()
463+
};
464+
465+
let response: hdfs::GetAdditionalDatanodeResponseProto =
466+
self.call("getAdditionalDatanode", message, true).await?;
467+
468+
Ok(response.block.locs)
469+
}
447470
}
448471

449472
impl Drop for NamenodeProtocol {

rust/src/hdfs/replace_datanode.rs

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::proto::hdfs::DatanodeInfoProto;
2+
3+
#[derive(Debug, Clone, Copy, PartialEq)]
4+
pub enum Policy {
5+
/// The feature is disabled in the entire site
6+
Disable,
7+
/// Never add a new datanode
8+
Never,
9+
/// Default policy based on replication conditions
10+
Default,
11+
/// Always add a new datanode when an existing datanode is removed
12+
Always,
13+
}
14+
15+
pub struct ReplaceDatanodeOnFailure {
16+
policy: Policy,
17+
best_effort: bool,
18+
}
19+
20+
impl ReplaceDatanodeOnFailure {
21+
pub fn new(policy: Policy, best_effort: bool) -> Self {
22+
Self {
23+
policy,
24+
best_effort,
25+
}
26+
}
27+
28+
/// Check if the feature is enabled
29+
pub fn check_enabled(&self) -> bool {
30+
self.policy != Policy::Disable
31+
}
32+
33+
/// Best effort means that the client will try to replace the failed datanode
34+
/// (provided that the policy is satisfied), however, it will continue the
35+
/// write operation in case that the datanode replacement also fails.
36+
pub fn is_best_effort(&self) -> bool {
37+
self.best_effort
38+
}
39+
40+
/// Does it need a replacement according to the policy?
41+
pub fn should_replace(
42+
&self,
43+
replication: u32,
44+
existing_datanodes: &[DatanodeInfoProto],
45+
is_append: bool,
46+
is_hflushed: bool,
47+
) -> bool {
48+
let n = existing_datanodes.len();
49+
if n == 0 || n >= replication as usize {
50+
// Don't need to add datanode for any policy
51+
return false;
52+
}
53+
54+
match self.policy {
55+
Policy::Disable | Policy::Never => false,
56+
Policy::Always => true,
57+
Policy::Default => {
58+
// DEFAULT condition:
59+
// Let r be the replication number.
60+
// Let n be the number of existing datanodes.
61+
// Add a new datanode only if r >= 3 and either
62+
// (1) floor(r/2) >= n; or
63+
// (2) r > n and the block is hflushed/appended.
64+
if replication < 3 {
65+
false
66+
} else if n <= (replication as usize / 2) {
67+
true
68+
} else {
69+
is_append || is_hflushed
70+
}
71+
}
72+
}
73+
}
74+
}
75+
76+
impl Default for ReplaceDatanodeOnFailure {
77+
fn default() -> Self {
78+
Self {
79+
policy: Policy::Default,
80+
best_effort: true,
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)