2
2
mod test {
3
3
4
4
use bytes:: { Buf , BufMut , Bytes , BytesMut } ;
5
+ use hdfs_native:: file:: FileReader ;
5
6
use hdfs_native:: WriteOptions ;
6
7
use serial_test:: serial;
7
8
use std:: collections:: HashSet ;
@@ -14,6 +15,8 @@ mod test {
14
15
use hdfs_native:: test:: { EcFaultInjection , EC_FAULT_INJECTOR } ;
15
16
use hdfs_native:: { client:: Client , Result } ;
16
17
18
+ const CELL_SIZE : usize = 1024 * 1024 ;
19
+
17
20
fn create_file ( url : & str , path : & str , size : usize ) -> io:: Result < ( ) > {
18
21
assert ! ( size % 4 == 0 ) ;
19
22
let num_ints = size / 4 ;
@@ -55,18 +58,38 @@ mod test {
55
58
}
56
59
}
57
60
61
+ async fn verify_positioned_read ( reader : & FileReader , offset : usize , len : usize ) -> Result < ( ) > {
62
+ assert ! ( offset % 4 == 0 ) ;
63
+ assert ! ( len % 4 == 0 ) ;
64
+ let first_int = offset / 4 ;
65
+ let num_ints = len / 4 ;
66
+
67
+ let mut data = reader. read_range ( offset, len) . await ?;
68
+
69
+ for i in first_int..( first_int + num_ints) {
70
+ assert_eq ! (
71
+ data. get_u32( ) ,
72
+ i as u32 ,
73
+ "Different values at integer {}" ,
74
+ i
75
+ ) ;
76
+ }
77
+
78
+ Ok ( ( ) )
79
+ }
80
+
58
81
fn sizes_to_test ( data_units : usize ) -> Vec < usize > {
59
82
vec ! [
60
- 16usize , // Small
61
- 1024 * 1024 , // One cell
62
- 1024 * 1024 - 4 , // Just smaller than one cell
63
- 1024 * 1024 + 4 , // Just bigger than one cell
64
- 1024 * 1024 * data_units * 5 , // Five "rows" of cells
65
- 1024 * 1024 * data_units * 5 - 4 ,
66
- 1024 * 1024 * data_units * 5 + 4 ,
67
- 128 * 1024 * 1024 ,
68
- 128 * 1024 * 1024 - 4 ,
69
- 128 * 1024 * 1024 + 4 ,
83
+ 16usize , // Small
84
+ CELL_SIZE , // One cell
85
+ CELL_SIZE - 4 , // Just smaller than one cell
86
+ CELL_SIZE + 4 , // Just bigger than one cell
87
+ CELL_SIZE * data_units * 5 , // Five "rows" of cells
88
+ CELL_SIZE * data_units * 5 - 4 ,
89
+ CELL_SIZE * data_units * 5 + 4 ,
90
+ 128 * CELL_SIZE ,
91
+ 128 * CELL_SIZE - 4 ,
92
+ 128 * CELL_SIZE + 4 ,
70
93
]
71
94
}
72
95
#[ tokio:: test]
@@ -107,6 +130,36 @@ mod test {
107
130
assert ! ( reader. read_range( 0 , reader. file_length( ) ) . await . is_err( ) ) ;
108
131
}
109
132
133
+ // Reset fault injector
134
+ // Fail more than the number of parity shards, read should fail
135
+ let _ = EC_FAULT_INJECTOR . lock ( ) . unwrap ( ) . insert ( EcFaultInjection {
136
+ fail_blocks : vec ! [ ] ,
137
+ } ) ;
138
+
139
+ // Test positioned reads
140
+ // Create 1 "row" of data
141
+ create_file ( & dfs. url , & file, data * CELL_SIZE ) ?;
142
+
143
+ let reader = client. read ( & file) . await ?;
144
+
145
+ // Read the first cell completely
146
+ verify_positioned_read ( & reader, 0 , CELL_SIZE ) . await ?;
147
+
148
+ // Read part of the first cell from the beginning
149
+ verify_positioned_read ( & reader, 0 , 1024 ) . await ?;
150
+
151
+ // Read part of the first cell in the middle
152
+ verify_positioned_read ( & reader, 1024 , 2048 ) . await ?;
153
+
154
+ // Read the second cell completely
155
+ verify_positioned_read ( & reader, CELL_SIZE , CELL_SIZE ) . await ?;
156
+
157
+ // Read part of the second cell from the beginning
158
+ verify_positioned_read ( & reader, CELL_SIZE , CELL_SIZE + 1024 ) . await ?;
159
+
160
+ // Read part of the second cell in the middle
161
+ verify_positioned_read ( & reader, CELL_SIZE + 1024 , CELL_SIZE + 2048 ) . await ?;
162
+
110
163
assert ! ( client. delete( & file, false ) . await ?) ;
111
164
}
112
165
let _ = EC_FAULT_INJECTOR . lock ( ) . unwrap ( ) . take ( ) ;
0 commit comments