Skip to content

Commit df21e88

Browse files
author
zhaohaiyuan
committed
add integration test for replace dn on failure policy
1 parent ad75c44 commit df21e88

File tree

1 file changed

+102
-73
lines changed

1 file changed

+102
-73
lines changed

rust/tests/test_write_resiliency.rs

+102-73
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
#[cfg(feature = "integration-test")]
1+
// #[cfg(feature = "integration-test")]
22
mod test {
33

4-
use std::{collections::HashSet, sync::atomic::Ordering, time::Duration};
4+
use std::{
5+
collections::{HashMap, HashSet},
6+
sync::atomic::Ordering,
7+
time::Duration,
8+
};
59

610
use bytes::{Buf, BufMut, Bytes, BytesMut};
711
use hdfs_native::{
@@ -68,79 +72,104 @@ mod test {
6872
}
6973

7074
async fn test_write_failures() -> Result<()> {
71-
let client = Client::default();
72-
73-
let file = "/testfile";
74-
let bytes_to_write = 2usize * 1024 * 1024;
75-
76-
let mut data = BytesMut::with_capacity(bytes_to_write);
77-
for i in 0..(bytes_to_write / 4) {
78-
data.put_i32(i as i32);
75+
let mut replace_dn_on_failure_conf: HashMap<String, String> = HashMap::new();
76+
replace_dn_on_failure_conf.insert(
77+
"dfs.client.block.write.replace-datanode-on-failure.enable".to_string(),
78+
"true".to_string(),
79+
);
80+
replace_dn_on_failure_conf.insert(
81+
"dfs.client.block.write.replace-datanode-on-failure.policy".to_string(),
82+
"ALWAYS".to_string(),
83+
);
84+
85+
for (i, client) in vec![
86+
Client::default(),
87+
// Client::default_with_config(replace_dn_on_failure_conf).unwrap(),
88+
]
89+
.iter()
90+
.enumerate()
91+
{
92+
let file = format!("/testfile{}", i);
93+
let bytes_to_write = 2usize * 1024 * 1024;
94+
95+
let mut data = BytesMut::with_capacity(bytes_to_write);
96+
for i in 0..(bytes_to_write / 4) {
97+
data.put_i32(i as i32);
98+
}
99+
100+
// Test connection failure before writing data
101+
let mut writer = client
102+
.create(&file, WriteOptions::default().replication(3))
103+
.await?;
104+
105+
WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
106+
107+
let data = data.freeze();
108+
writer.write(data.clone()).await?;
109+
writer.close().await?;
110+
111+
let reader = client.read(&file).await?;
112+
check_file_content(&reader, data.clone()).await?;
113+
114+
// Test connection failure after data has been written
115+
let mut writer = client
116+
.create(
117+
&file,
118+
WriteOptions::default().replication(3).overwrite(true),
119+
)
120+
.await?;
121+
122+
writer.write(data.slice(..bytes_to_write / 2)).await?;
123+
124+
// Give a little time for the packets to send
125+
tokio::time::sleep(Duration::from_millis(100)).await;
126+
127+
WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
128+
129+
writer.write(data.slice(bytes_to_write / 2..)).await?;
130+
writer.close().await?;
131+
132+
let reader = client.read(&file).await?;
133+
check_file_content(&reader, data.clone()).await?;
134+
135+
// Test failure in from ack status before any data is written
136+
let mut writer = client
137+
.create(
138+
&file,
139+
WriteOptions::default().replication(3).overwrite(true),
140+
)
141+
.await?;
142+
143+
*WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2);
144+
145+
writer.write(data.clone()).await?;
146+
writer.close().await?;
147+
148+
let reader = client.read(&file).await?;
149+
check_file_content(&reader, data.clone()).await?;
150+
151+
// Test failure in from ack status after some data has been written
152+
let mut writer = client
153+
.create(
154+
&file,
155+
WriteOptions::default().replication(3).overwrite(true),
156+
)
157+
.await?;
158+
159+
writer.write(data.slice(..bytes_to_write / 2)).await?;
160+
161+
// Give a little time for the packets to send
162+
tokio::time::sleep(Duration::from_millis(100)).await;
163+
164+
*WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2);
165+
166+
writer.write(data.slice(bytes_to_write / 2..)).await?;
167+
writer.close().await?;
168+
169+
let reader = client.read(&file).await?;
170+
check_file_content(&reader, data.clone()).await?;
79171
}
80172

81-
// Test connection failure before writing data
82-
let mut writer = client
83-
.create(file, WriteOptions::default().replication(3))
84-
.await?;
85-
86-
WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
87-
88-
let data = data.freeze();
89-
writer.write(data.clone()).await?;
90-
writer.close().await?;
91-
92-
let reader = client.read("/testfile").await?;
93-
check_file_content(&reader, data.clone()).await?;
94-
95-
// Test connection failure after data has been written
96-
let mut writer = client
97-
.create(file, WriteOptions::default().replication(3).overwrite(true))
98-
.await?;
99-
100-
writer.write(data.slice(..bytes_to_write / 2)).await?;
101-
102-
// Give a little time for the packets to send
103-
tokio::time::sleep(Duration::from_millis(100)).await;
104-
105-
WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
106-
107-
writer.write(data.slice(bytes_to_write / 2..)).await?;
108-
writer.close().await?;
109-
110-
let reader = client.read("/testfile").await?;
111-
check_file_content(&reader, data.clone()).await?;
112-
113-
// Test failure in from ack status before any data is written
114-
let mut writer = client
115-
.create(file, WriteOptions::default().replication(3).overwrite(true))
116-
.await?;
117-
118-
*WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2);
119-
120-
writer.write(data.clone()).await?;
121-
writer.close().await?;
122-
123-
let reader = client.read("/testfile").await?;
124-
check_file_content(&reader, data.clone()).await?;
125-
126-
// Test failure in from ack status after some data has been written
127-
let mut writer = client
128-
.create(file, WriteOptions::default().replication(3).overwrite(true))
129-
.await?;
130-
131-
writer.write(data.slice(..bytes_to_write / 2)).await?;
132-
133-
// Give a little time for the packets to send
134-
tokio::time::sleep(Duration::from_millis(100)).await;
135-
136-
*WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2);
137-
138-
writer.write(data.slice(bytes_to_write / 2..)).await?;
139-
writer.close().await?;
140-
141-
let reader = client.read("/testfile").await?;
142-
check_file_content(&reader, data.clone()).await?;
143-
144173
Ok(())
145174
}
146175

0 commit comments

Comments
 (0)