Skip to content

Commit 19d1691

Browse files
authored
Fix some positioned reads for erasure coded files (#100)
1 parent 44b2797 commit 19d1691

File tree

3 files changed

+64
-14
lines changed

3 files changed

+64
-14
lines changed

.gitmodules

-3
This file was deleted.

crates/hdfs-native/src/hdfs/block_reader.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl StripedBlockStream {
346346
let mut bytes_to_write = self.len;
347347
for mut cell in decoded_bufs.into_iter() {
348348
if bytes_to_skip > 0 {
349-
if cell.len() > bytes_to_skip {
349+
if bytes_to_skip >= cell.len() {
350350
bytes_to_skip -= cell.len();
351351
continue;
352352
} else {

crates/hdfs-native/tests/test_ec.rs

+63-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
mod test {
33

44
use bytes::{Buf, BufMut, Bytes, BytesMut};
5+
use hdfs_native::file::FileReader;
56
use hdfs_native::WriteOptions;
67
use serial_test::serial;
78
use std::collections::HashSet;
@@ -14,6 +15,8 @@ mod test {
1415
use hdfs_native::test::{EcFaultInjection, EC_FAULT_INJECTOR};
1516
use hdfs_native::{client::Client, Result};
1617

18+
const CELL_SIZE: usize = 1024 * 1024;
19+
1720
fn create_file(url: &str, path: &str, size: usize) -> io::Result<()> {
1821
assert!(size % 4 == 0);
1922
let num_ints = size / 4;
@@ -55,18 +58,38 @@ mod test {
5558
}
5659
}
5760

61+
async fn verify_positioned_read(reader: &FileReader, offset: usize, len: usize) -> Result<()> {
62+
assert!(offset % 4 == 0);
63+
assert!(len % 4 == 0);
64+
let first_int = offset / 4;
65+
let num_ints = len / 4;
66+
67+
let mut data = reader.read_range(offset, len).await?;
68+
69+
for i in first_int..(first_int + num_ints) {
70+
assert_eq!(
71+
data.get_u32(),
72+
i as u32,
73+
"Different values at integer {}",
74+
i
75+
);
76+
}
77+
78+
Ok(())
79+
}
80+
5881
fn sizes_to_test(data_units: usize) -> Vec<usize> {
5982
vec![
60-
16usize, // Small
61-
1024 * 1024, // One cell
62-
1024 * 1024 - 4, // Just smaller than one cell
63-
1024 * 1024 + 4, // Just bigger than one cell
64-
1024 * 1024 * data_units * 5, // Five "rows" of cells
65-
1024 * 1024 * data_units * 5 - 4,
66-
1024 * 1024 * data_units * 5 + 4,
67-
128 * 1024 * 1024,
68-
128 * 1024 * 1024 - 4,
69-
128 * 1024 * 1024 + 4,
83+
16usize, // Small
84+
CELL_SIZE, // One cell
85+
CELL_SIZE - 4, // Just smaller than one cell
86+
CELL_SIZE + 4, // Just bigger than one cell
87+
CELL_SIZE * data_units * 5, // Five "rows" of cells
88+
CELL_SIZE * data_units * 5 - 4,
89+
CELL_SIZE * data_units * 5 + 4,
90+
128 * CELL_SIZE,
91+
128 * CELL_SIZE - 4,
92+
128 * CELL_SIZE + 4,
7093
]
7194
}
7295
#[tokio::test]
@@ -108,6 +131,36 @@ mod test {
108131
assert!(reader.read_range(0, reader.file_length()).await.is_err());
109132
}
110133

134+
// Reset fault injector
135+
// Fail more than the number of parity shards, read should fail
136+
let _ = EC_FAULT_INJECTOR.lock().unwrap().insert(EcFaultInjection {
137+
fail_blocks: vec![],
138+
});
139+
140+
// Test positioned reads
141+
// Create 1 "row" of data
142+
create_file(&dfs.url, &file, data * CELL_SIZE)?;
143+
144+
let reader = client.read(&file).await?;
145+
146+
// Read the first cell completely
147+
verify_positioned_read(&reader, 0, CELL_SIZE).await?;
148+
149+
// Read part of the first cell from the beginning
150+
verify_positioned_read(&reader, 0, 1024).await?;
151+
152+
// Read part of the first cell in the middle
153+
verify_positioned_read(&reader, 1024, 2048).await?;
154+
155+
// Read the second cell completely
156+
verify_positioned_read(&reader, CELL_SIZE, CELL_SIZE).await?;
157+
158+
// Read part of the second cell from the beginning
159+
verify_positioned_read(&reader, CELL_SIZE, CELL_SIZE + 1024).await?;
160+
161+
// Read part of the second cell in the middle
162+
verify_positioned_read(&reader, CELL_SIZE + 1024, CELL_SIZE + 2048).await?;
163+
111164
assert!(client.delete(&file, false).await?);
112165
}
113166
let _ = EC_FAULT_INJECTOR.lock().unwrap().take();

0 commit comments

Comments
 (0)