Skip to content

Commit ad75c44

Browse files
author
zhaohaiyuan
committed
pass exist tests
1 parent 109ff59 commit ad75c44

File tree

5 files changed

+214
-11
lines changed

5 files changed

+214
-11
lines changed

rust/src/client.rs

+6
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ impl MountTable {
132132
#[derive(Debug)]
133133
pub struct Client {
134134
mount_table: Arc<MountTable>,
135+
replace_datanode_on_failure: crate::hdfs::replace_datanode::ReplaceDatanodeOnFailure,
135136
}
136137

137138
impl Client {
@@ -194,8 +195,11 @@ impl Client {
194195
}
195196
};
196197

198+
let replace_datanode_on_failure = config.get_replace_datanode_on_failure_policy();
199+
197200
Ok(Self {
198201
mount_table: Arc::new(mount_table),
202+
replace_datanode_on_failure,
199203
})
200204
}
201205

@@ -343,6 +347,7 @@ impl Client {
343347
resolved_path,
344348
status,
345349
None,
350+
self.replace_datanode_on_failure.clone(),
346351
))
347352
}
348353
None => Err(HdfsError::FileNotFound(src.to_string())),
@@ -383,6 +388,7 @@ impl Client {
383388
resolved_path,
384389
status,
385390
append_response.block,
391+
self.replace_datanode_on_failure.clone(),
386392
))
387393
}
388394
None => Err(HdfsError::FileNotFound(src.to_string())),

rust/src/common/config.rs

+99-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ const DFS_CLIENT_FAILOVER_RANDOM_ORDER: &str = "dfs.client.failover.random.order
2727
// Viewfs settings
2828
const VIEWFS_MOUNTTABLE_PREFIX: &str = "fs.viewfs.mounttable";
2929

30-
#[derive(Debug)]
30+
const DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY: &str =
31+
"dfs.client.block.write.replace-datanode-on-failure.enable";
32+
const DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY: &str =
33+
"dfs.client.block.write.replace-datanode-on-failure.policy";
34+
35+
#[derive(Debug, Clone)]
3136
pub struct Configuration {
3237
map: HashMap<String, String>,
3338
}
@@ -145,6 +150,37 @@ impl Configuration {
145150
.collect()
146151
}
147152

153+
/// Get the replace datanode on failure policy from configuration
154+
pub fn get_replace_datanode_on_failure_policy(
155+
&self,
156+
) -> crate::hdfs::replace_datanode::ReplaceDatanodeOnFailure {
157+
use crate::hdfs::replace_datanode::{Policy, ReplaceDatanodeOnFailure};
158+
159+
let enabled = self.get_boolean(
160+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
161+
false,
162+
);
163+
164+
if !enabled {
165+
return ReplaceDatanodeOnFailure::new(Policy::Disable, false);
166+
}
167+
168+
let policy_str = self
169+
.get(DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
170+
.unwrap_or_else(|| "DEFAULT".to_string())
171+
.to_uppercase();
172+
173+
let policy = match policy_str.as_str() {
174+
"NEVER" => Policy::Never,
175+
"DEFAULT" => Policy::Default,
176+
"ALWAYS" => Policy::Always,
177+
_ => Policy::Default,
178+
};
179+
180+
// Best effort is always true when enabled
181+
ReplaceDatanodeOnFailure::new(policy, true)
182+
}
183+
148184
fn read_from_file(path: &Path) -> io::Result<Vec<(String, String)>> {
149185
let content = fs::read_to_string(path)?;
150186
let tree = roxmltree::Document::parse(&content).unwrap();
@@ -188,14 +224,17 @@ impl Configuration {
188224

189225
#[cfg(test)]
190226
mod test {
227+
use std::collections::HashMap;
191228
use std::net::IpAddr;
192229

193230
use dns_lookup::lookup_addr;
194231

195232
use crate::common::config::DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN;
196233

197234
use super::{
198-
Configuration, DFS_CLIENT_FAILOVER_RESOLVE_NEEDED, HA_NAMENODES_PREFIX,
235+
Configuration, DFS_CLIENT_FAILOVER_RESOLVE_NEEDED,
236+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
237+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY, HA_NAMENODES_PREFIX,
199238
HA_NAMENODE_RPC_ADDRESS_PREFIX, VIEWFS_MOUNTTABLE_PREFIX,
200239
};
201240

@@ -299,4 +338,62 @@ mod test {
299338
assert_eq!(urls.len(), 1, "{:?}", urls);
300339
assert_eq!(urls[0], "127.0.0.1:9000");
301340
}
341+
342+
#[test]
343+
fn test_replace_datanode_policy_config() {
344+
// Test default policy
345+
let config = Configuration {
346+
map: HashMap::new(),
347+
};
348+
let policy = config.get_replace_datanode_on_failure_policy();
349+
assert_eq!(policy.is_best_effort(), false);
350+
351+
// Test disabled policy
352+
let config = Configuration {
353+
map: [(
354+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY.to_string(),
355+
"false".to_string(),
356+
)]
357+
.into_iter()
358+
.collect(),
359+
};
360+
let policy = config.get_replace_datanode_on_failure_policy();
361+
assert_eq!(policy.is_best_effort(), false);
362+
363+
// Test NEVER policy
364+
let config = Configuration {
365+
map: [
366+
(
367+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY.to_string(),
368+
"true".to_string(),
369+
),
370+
(
371+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY.to_string(),
372+
"NEVER".to_string(),
373+
),
374+
]
375+
.into_iter()
376+
.collect(),
377+
};
378+
let policy = config.get_replace_datanode_on_failure_policy();
379+
assert_eq!(policy.is_best_effort(), true);
380+
381+
// Test ALWAYS policy
382+
let config = Configuration {
383+
map: [
384+
(
385+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY.to_string(),
386+
"true".to_string(),
387+
),
388+
(
389+
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY.to_string(),
390+
"ALWAYS".to_string(),
391+
),
392+
]
393+
.into_iter()
394+
.collect(),
395+
};
396+
let policy = config.get_replace_datanode_on_failure_policy();
397+
assert_eq!(policy.is_best_effort(), true);
398+
}
302399
}

rust/src/file.rs

+4
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ pub struct FileWriter {
168168
last_block: Option<hdfs::LocatedBlockProto>,
169169
closed: bool,
170170
bytes_written: usize,
171+
replace_datanode_on_failure: crate::hdfs::replace_datanode::ReplaceDatanodeOnFailure,
171172
}
172173

173174
impl FileWriter {
@@ -177,6 +178,7 @@ impl FileWriter {
177178
status: hdfs::HdfsFileStatusProto,
178179
// Some for append, None for create
179180
last_block: Option<hdfs::LocatedBlockProto>,
181+
replace_datanode_on_failure: crate::hdfs::replace_datanode::ReplaceDatanodeOnFailure,
180182
) -> Self {
181183
protocol.add_file_lease(status.file_id(), status.namespace.clone());
182184
Self {
@@ -187,6 +189,7 @@ impl FileWriter {
187189
last_block,
188190
closed: false,
189191
bytes_written: 0,
192+
replace_datanode_on_failure,
190193
}
191194
}
192195

@@ -230,6 +233,7 @@ impl FileWriter {
230233
.map(resolve_ec_policy)
231234
.transpose()?
232235
.as_ref(),
236+
self.replace_datanode_on_failure.clone(),
233237
)
234238
.await?;
235239

rust/src/hdfs/block_writer.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl BlockWriter {
3838
block_size: usize,
3939
server_defaults: hdfs::FsServerDefaultsProto,
4040
ec_schema: Option<&EcSchema>,
41+
replace_datanode_on_failure: ReplaceDatanodeOnFailure,
4142
) -> Result<Self> {
4243
let block_writer = if let Some(ec_schema) = ec_schema {
4344
Self::Striped(StripedBlockWriter::new(
@@ -49,7 +50,14 @@ impl BlockWriter {
4950
))
5051
} else {
5152
Self::Replicated(
52-
ReplicatedBlockWriter::new(protocol, block, block_size, server_defaults).await?,
53+
ReplicatedBlockWriter::new(
54+
protocol,
55+
block,
56+
block_size,
57+
server_defaults,
58+
replace_datanode_on_failure,
59+
)
60+
.await?,
5361
)
5462
};
5563
Ok(block_writer)
@@ -310,6 +318,7 @@ impl ReplicatedBlockWriter {
310318
block: hdfs::LocatedBlockProto,
311319
block_size: usize,
312320
server_defaults: hdfs::FsServerDefaultsProto,
321+
replace_datanode_on_failure: ReplaceDatanodeOnFailure,
313322
) -> Result<Self> {
314323
let pipeline =
315324
Self::setup_pipeline(&protocol, &block, &server_defaults, None, None).await?;
@@ -342,7 +351,7 @@ impl ReplicatedBlockWriter {
342351
current_packet,
343352

344353
pipeline: Some(pipeline),
345-
replace_datanode: ReplaceDatanodeOnFailure::default(),
354+
replace_datanode: replace_datanode_on_failure,
346355
};
347356

348357
Ok(this)
@@ -379,7 +388,6 @@ impl ReplicatedBlockWriter {
379388
if should_replace {
380389
match self.add_datanode_to_pipeline().await {
381390
Ok(located_block) => {
382-
// Successfully added new datanode
383391
new_block.locs = located_block.locs;
384392
new_block.storage_i_ds = located_block.storage_i_ds;
385393
new_block.storage_types = located_block.storage_types;
@@ -599,23 +607,20 @@ impl ReplicatedBlockWriter {
599607
}
600608
}
601609

602-
/// Transfer a block from a source datanode to target datanodes
603610
async fn transfer_block(
604611
&self,
605612
src_node: &hdfs::DatanodeInfoProto,
606613
target_nodes: &[hdfs::DatanodeInfoProto],
607614
target_storage_types: &[i32],
608615
block_token: &common::TokenProto,
609616
) -> Result<()> {
610-
// Connect to the source datanode
611617
let mut connection = DatanodeConnection::connect(
612618
&src_node.id,
613619
block_token,
614620
self.protocol.get_cached_data_encryption_key().await?,
615621
)
616622
.await?;
617623

618-
// Create the transfer block request
619624
let message = hdfs::OpTransferBlockProto {
620625
header: connection.build_header(&self.block.b, Some(block_token.clone())),
621626
targets: target_nodes.to_vec(),
@@ -625,12 +630,10 @@ impl ReplicatedBlockWriter {
625630

626631
debug!("Transfer block request: {:?}", &message);
627632

628-
// Send the transfer block request
629633
let response = connection.send(Op::TransferBlock, &message).await?;
630634

631635
debug!("Transfer block response: {:?}", response);
632636

633-
// Check the response status
634637
if response.status != hdfs::Status::Success as i32 {
635638
return Err(HdfsError::DataTransferError(
636639
"Failed to add a datanode".to_string(),
@@ -640,7 +643,6 @@ impl ReplicatedBlockWriter {
640643
Ok(())
641644
}
642645

643-
/// Add a new datanode to the existing pipeline
644646
async fn add_datanode_to_pipeline(&mut self) -> Result<hdfs::LocatedBlockProto> {
645647
let original_nodes = self.block.locs.clone();
646648
let located_block = self
@@ -830,6 +832,10 @@ impl StripedBlockWriter {
830832
cloned,
831833
self.block_size,
832834
self.server_defaults.clone(),
835+
ReplaceDatanodeOnFailure::new(
836+
super::replace_datanode::Policy::Disable,
837+
false,
838+
),
833839
)
834840
.await?,
835841
)

0 commit comments

Comments
 (0)