Skip to content

Commit 0e90092

Browse files
authored
Add support for encrypting data transfers with AES (#94)
1 parent f8183c3 commit 0e90092

File tree

6 files changed

+244
-93
lines changed

6 files changed

+244
-93
lines changed

crates/hdfs-native/minidfs/src/main/java/main/Main.java

+3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ public static void main(String args[]) throws Exception {
5757
conf.set(HADOOP_RPC_PROTECTION, "privacy");
5858
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
5959
if (flags.contains("data_transfer_encryption")) {
60+
// Force encryption for all connections
6061
conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
62+
}
63+
if (flags.contains("aes")) {
6164
conf.set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, "AES/CTR/NoPadding");
6265
}
6366
} else if (flags.contains("integrity")) {

crates/hdfs-native/src/hdfs/connection.rs

+4-10
Original file line numberDiff line numberDiff line change
@@ -550,12 +550,9 @@ impl DatanodeConnection {
550550
.await?;
551551
self.writer.flush().await?;
552552

553-
let msg_length = self.reader.read_length_delimiter().await?;
553+
let message = self.reader.read_proto().await?;
554554

555-
let mut response_buf = BytesMut::zeroed(msg_length);
556-
self.reader.read_exact(&mut response_buf).await?;
557-
558-
let response = hdfs::BlockOpResponseProto::decode(response_buf.freeze())?;
555+
let response = hdfs::BlockOpResponseProto::decode(message)?;
559556
Ok(response)
560557
}
561558

@@ -630,12 +627,9 @@ pub(crate) struct DatanodeReader {
630627

631628
impl DatanodeReader {
632629
pub(crate) async fn read_ack(&mut self) -> Result<hdfs::PipelineAckProto> {
633-
let ack_length = self.reader.read_length_delimiter().await?;
634-
635-
let mut response_buf = BytesMut::zeroed(ack_length);
636-
self.reader.read_exact(&mut response_buf).await?;
630+
let message = self.reader.read_proto().await?;
637631

638-
let response = hdfs::PipelineAckProto::decode(response_buf.freeze())?;
632+
let response = hdfs::PipelineAckProto::decode(message)?;
639633
Ok(response)
640634
}
641635
}

crates/hdfs-native/src/minidfs.rs

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub enum DfsFeatures {
1313
Token,
1414
Integrity,
1515
Privacy,
16+
AES,
1617
HA,
1718
ViewFS,
1819
EC,
@@ -28,6 +29,7 @@ impl DfsFeatures {
2829
DfsFeatures::Privacy => "privacy",
2930
DfsFeatures::Security => "security",
3031
DfsFeatures::Integrity => "integrity",
32+
DfsFeatures::AES => "aes",
3133
DfsFeatures::Token => "token",
3234
DfsFeatures::RBF => "rbf",
3335
}

crates/hdfs-native/src/security/digest.rs

+8
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,14 @@ impl DigestSaslSession {
357357
server: kis,
358358
}
359359
}
360+
361+
pub(crate) fn supports_encryption(&self) -> bool {
362+
match &self.state {
363+
DigestState::Stepped(ctx) => matches!(ctx.qop, Qop::AuthConf),
364+
DigestState::Completed(ctx) => ctx.as_ref().is_some_and(|c| c.encryptor.is_some()),
365+
_ => false,
366+
}
367+
}
360368
}
361369

362370
impl SaslSession for DigestSaslSession {

0 commit comments

Comments
 (0)