Skip to content

Commit bb35483

Browse files
committed
Merge branch 'master' into aes-data-transfer-encryption
2 parents d7d3aa4 + f8183c3 commit bb35483

File tree

1 file changed

+31
-20
lines changed

1 file changed

+31
-20
lines changed

crates/hdfs-native/src/hdfs/block_reader.rs

+31-20
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,19 @@ impl StripedBlockStream {
250250
let block_read_len = block_end - block_start;
251251

252252
assert_eq!(self.block.block_indices().len(), self.block.locs.len());
253-
let block_map: HashMap<u8, &hdfs::DatanodeInfoProto> = self
253+
let datanode_infos: Vec<(&hdfs::DatanodeInfoProto, &common::TokenProto)> = self
254+
.block
255+
.locs
256+
.iter()
257+
.zip(self.block.block_tokens.iter())
258+
.collect();
259+
260+
let block_map: HashMap<u8, (&hdfs::DatanodeInfoProto, &common::TokenProto)> = self
254261
.block
255262
.block_indices()
256263
.iter()
257264
.copied()
258-
.zip(self.block.locs.iter())
265+
.zip(datanode_infos.into_iter())
259266
.collect();
260267

261268
let mut stripe_results: Vec<Option<Bytes>> =
@@ -264,10 +271,12 @@ impl StripedBlockStream {
264271
let mut futures = Vec::new();
265272

266273
for index in 0..self.ec_schema.data_units as u8 {
274+
let datanode_info = block_map.get(&index);
267275
futures.push(self.read_vertical_stripe(
268276
&self.ec_schema,
269277
index,
270-
block_map.get(&index),
278+
datanode_info.map(|(datanode, _)| datanode),
279+
datanode_info.map(|(_, token)| token),
271280
block_start,
272281
block_read_len,
273282
));
@@ -287,12 +296,13 @@ impl StripedBlockStream {
287296
let mut parity_unit = 0usize;
288297
while blocks_needed > 0 && parity_unit < self.ec_schema.parity_units {
289298
let block_index = (self.ec_schema.data_units + parity_unit) as u8;
290-
let datanode_info = block_map.get(&block_index).unwrap();
299+
let datanode_info = block_map.get(&block_index);
291300
let result = self
292301
.read_vertical_stripe(
293302
&self.ec_schema,
294303
block_index,
295-
Some(datanode_info),
304+
datanode_info.map(|(datanode, _)| datanode),
305+
datanode_info.map(|(_, token)| token),
296306
block_start,
297307
block_read_len,
298308
)
@@ -337,6 +347,7 @@ impl StripedBlockStream {
337347
ec_schema: &EcSchema,
338348
index: u8,
339349
datanode: Option<&&hdfs::DatanodeInfoProto>,
350+
token: Option<&&common::TokenProto>,
340351
offset: usize,
341352
len: usize,
342353
) -> Result<Bytes> {
@@ -347,32 +358,32 @@ impl StripedBlockStream {
347358
return Err(HdfsError::InternalError("Testing error".to_string()));
348359
}
349360
}
361+
let max_block_offset =
362+
ec_schema.max_offset(index as usize, self.block.b.num_bytes() as usize);
350363

351-
let mut buf = BytesMut::zeroed(len);
352-
if let Some(datanode_info) = datanode {
353-
let max_block_offset =
354-
ec_schema.max_offset(index as usize, self.block.b.num_bytes() as usize);
364+
let read_len = usize::min(len, max_block_offset - offset);
355365

356-
let read_len = usize::min(len, max_block_offset - offset);
366+
if read_len == 0 {
367+
// We're past the end of the file so there's nothign to read, just return a zeroed buffer
368+
Ok(BytesMut::zeroed(len).freeze())
369+
} else if let Some((datanode_info, token)) = datanode.zip(token) {
370+
let mut buf = BytesMut::zeroed(len);
357371

358372
// Each vertical stripe has a block ID of the original located block ID + block index
359373
// That was fun to figure out
360374
let mut block = self.block.b.clone();
361375
block.block_id += index as u64;
362376

363-
// The token of the first block is the main one, then all the rest are in the `block_tokens` list
364-
let token = &self.block.block_tokens[self
365-
.block
366-
.block_indices()
367-
.iter()
368-
.position(|x| *x == index)
369-
.unwrap()];
370-
371377
self.read_from_datanode(&datanode_info.id, &block, token, offset, read_len, &mut buf)
372378
.await?;
373-
}
374379

375-
Ok(buf.freeze())
380+
Ok(buf.freeze())
381+
} else {
382+
// There should be data to read but we didn't get block info for this index, so this shard is missing
383+
Err(HdfsError::ErasureCodingError(
384+
"Shard is missing".to_string(),
385+
))
386+
}
376387
}
377388

378389
async fn read_from_datanode(

0 commit comments

Comments
 (0)