Skip to content

Commit 109ff59

Browse files
author
zhaohaiyuan
committed
support replace node policy
1 parent 33a2249 commit 109ff59

File tree

4 files changed

+109
-41
lines changed

4 files changed

+109
-41
lines changed

rust/src/hdfs/block_writer.rs

+104-33
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
protocol::NamenodeProtocol,
1313
replace_datanode::ReplaceDatanodeOnFailure,
1414
},
15-
proto::hdfs,
15+
proto::{common, hdfs},
1616
HdfsError, Result,
1717
};
1818

@@ -352,6 +352,7 @@ impl ReplicatedBlockWriter {
352352
&mut self,
353353
failed_nodes: Vec<usize>,
354354
packets_to_replay: Vec<WritePacket>,
355+
is_closing: bool,
355356
) -> Result<()> {
356357
debug!(
357358
"Failed nodes: {:?}, block locs: {:?}",
@@ -367,45 +368,35 @@ impl ReplicatedBlockWriter {
367368

368369
let mut new_block = self.block.clone();
369370

370-
// Check if we should try to replace failed datanodes
371-
let should_replace = self.replace_datanode.should_replace(
372-
new_block.locs.len() as u32,
373-
&new_block.locs,
374-
new_block.b.num_bytes() > 0,
375-
false,
376-
);
377-
378-
if should_replace && self.replace_datanode.check_enabled() {
379-
// Request additional datanodes to replace failed ones
380-
let additional_nodes = self
381-
.protocol
382-
.get_additional_datanodes(&new_block.b, &new_block.locs, failed_nodes.len() as u32)
383-
.await;
384-
385-
match additional_nodes {
386-
Ok(new_nodes) => {
387-
// Replace failed nodes with new ones
388-
for (failed_idx, new_node) in failed_nodes.iter().zip(new_nodes.into_iter()) {
389-
if *failed_idx < new_block.locs.len() {
390-
new_block.locs[*failed_idx] = new_node;
391-
}
392-
}
371+
let should_replace = !is_closing
372+
&& self.replace_datanode.should_replace(
373+
new_block.locs.len() as u32,
374+
&new_block.locs,
375+
new_block.b.num_bytes() > 0,
376+
false,
377+
);
378+
379+
if should_replace {
380+
match self.add_datanode_to_pipeline().await {
381+
Ok(located_block) => {
382+
// Successfully added new datanode
383+
new_block.locs = located_block.locs;
384+
new_block.storage_i_ds = located_block.storage_i_ds;
385+
new_block.storage_types = located_block.storage_types;
393386
}
394387
Err(e) => {
395388
if !self.replace_datanode.is_best_effort() {
396389
return Err(e);
397390
}
398-
// In best effort mode, continue with remaining nodes
399-
warn!("Failed to get replacement nodes: {}", e);
400-
for failed_node in failed_nodes {
401-
new_block.locs.remove(failed_node);
402-
new_block.storage_i_ds.remove(failed_node);
403-
new_block.storage_types.remove(failed_node);
391+
warn!("Failed to add replacement datanode: {}", e);
392+
for failed_node in &failed_nodes {
393+
new_block.locs.remove(*failed_node);
394+
new_block.storage_i_ds.remove(*failed_node);
395+
new_block.storage_types.remove(*failed_node);
404396
}
405397
}
406398
}
407399
} else {
408-
// Original behavior - remove failed nodes
409400
for failed_node in failed_nodes {
410401
new_block.locs.remove(failed_node);
411402
new_block.storage_i_ds.remove(failed_node);
@@ -541,7 +532,7 @@ impl ReplicatedBlockWriter {
541532
))
542533
}
543534
WriteStatus::Recover(failed_nodes, packets_to_replay) => {
544-
self.recover(failed_nodes, packets_to_replay).await?;
535+
self.recover(failed_nodes, packets_to_replay, false).await?;
545536
}
546537
}
547538
} else {
@@ -602,11 +593,91 @@ impl ReplicatedBlockWriter {
602593
{
603594
WriteStatus::Success => return Ok(self.block.b),
604595
WriteStatus::Recover(failed_nodes, packets_to_replay) => {
605-
self.recover(failed_nodes, packets_to_replay).await?
596+
self.recover(failed_nodes, packets_to_replay, true).await?
606597
}
607598
}
608599
}
609600
}
601+
602+
/// Transfer a block from a source datanode to target datanodes
603+
async fn transfer_block(
604+
&self,
605+
src_node: &hdfs::DatanodeInfoProto,
606+
target_nodes: &[hdfs::DatanodeInfoProto],
607+
target_storage_types: &[i32],
608+
block_token: &common::TokenProto,
609+
) -> Result<()> {
610+
// Connect to the source datanode
611+
let mut connection = DatanodeConnection::connect(
612+
&src_node.id,
613+
block_token,
614+
self.protocol.get_cached_data_encryption_key().await?,
615+
)
616+
.await?;
617+
618+
// Create the transfer block request
619+
let message = hdfs::OpTransferBlockProto {
620+
header: connection.build_header(&self.block.b, Some(block_token.clone())),
621+
targets: target_nodes.to_vec(),
622+
target_storage_types: target_storage_types.to_vec(),
623+
target_storage_ids: self.block.storage_i_ds.clone(),
624+
};
625+
626+
debug!("Transfer block request: {:?}", &message);
627+
628+
// Send the transfer block request
629+
let response = connection.send(Op::TransferBlock, &message).await?;
630+
631+
debug!("Transfer block response: {:?}", response);
632+
633+
// Check the response status
634+
if response.status != hdfs::Status::Success as i32 {
635+
return Err(HdfsError::DataTransferError(
636+
"Failed to add a datanode".to_string(),
637+
));
638+
}
639+
640+
Ok(())
641+
}
642+
643+
/// Add a new datanode to the existing pipeline
644+
async fn add_datanode_to_pipeline(&mut self) -> Result<hdfs::LocatedBlockProto> {
645+
let original_nodes = self.block.locs.clone();
646+
let located_block = self
647+
.protocol
648+
.get_additional_datanode(&self.block.b, &self.block.locs, 1)
649+
.await?;
650+
651+
let new_nodes = &located_block.locs;
652+
let new_node_idx = new_nodes
653+
.iter()
654+
.position(|node| {
655+
!original_nodes.iter().any(|orig| {
656+
orig.id.ip_addr == node.id.ip_addr && orig.id.xfer_port == node.id.xfer_port
657+
})
658+
})
659+
.ok_or_else(|| {
660+
HdfsError::DataTransferError(
661+
"No new datanode found in updated block locations".to_string(),
662+
)
663+
})?;
664+
665+
let src_node = if new_node_idx == 0 {
666+
new_nodes[1].clone()
667+
} else {
668+
new_nodes[new_node_idx - 1].clone()
669+
};
670+
671+
self.transfer_block(
672+
&src_node,
673+
&[new_nodes[new_node_idx].clone()],
674+
&[self.block.storage_types[0]], // Use the first storage type for the new node
675+
&self.block.block_token,
676+
)
677+
.await?;
678+
679+
Ok(located_block)
680+
}
610681
}
611682

612683
// Holds data for the current slice being written.

rust/src/hdfs/connection.rs

+2
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,15 @@ impl RpcListener {
379379
pub(crate) enum Op {
380380
WriteBlock,
381381
ReadBlock,
382+
TransferBlock,
382383
}
383384

384385
impl Op {
385386
fn value(&self) -> u8 {
386387
match self {
387388
Self::WriteBlock => 80,
388389
Self::ReadBlock => 81,
390+
Self::TransferBlock => 82,
389391
}
390392
}
391393
}

rust/src/hdfs/protocol.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -445,12 +445,12 @@ impl NamenodeProtocol {
445445
self.call("getAclStatus", message, false).await
446446
}
447447

448-
pub(crate) async fn get_additional_datanodes(
448+
pub(crate) async fn get_additional_datanode(
449449
&self,
450450
block: &hdfs::ExtendedBlockProto,
451451
existing_nodes: &[hdfs::DatanodeInfoProto],
452452
num_additional_nodes: u32,
453-
) -> Result<Vec<hdfs::DatanodeInfoProto>> {
453+
) -> Result<hdfs::LocatedBlockProto> {
454454
let message = hdfs::GetAdditionalDatanodeRequestProto {
455455
src: "".to_string(), // Not used by the namenode
456456
blk: block.clone(),
@@ -465,7 +465,7 @@ impl NamenodeProtocol {
465465
let response: hdfs::GetAdditionalDatanodeResponseProto =
466466
self.call("getAdditionalDatanode", message, true).await?;
467467

468-
Ok(response.block.locs)
468+
Ok(response.block)
469469
}
470470
}
471471

rust/src/hdfs/replace_datanode.rs

-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,6 @@ impl ReplaceDatanodeOnFailure {
2525
}
2626
}
2727

28-
/// Check if the feature is enabled
29-
pub fn check_enabled(&self) -> bool {
30-
self.policy != Policy::Disable
31-
}
32-
3328
/// Best effort means that the client will try to replace the failed datanode
3429
/// (provided that the policy is satisfied), however, it will continue the
3530
/// write operation in case that the datanode replacement also fails.

0 commit comments

Comments
 (0)