Skip to content

Commit 5542b15

Browse files
authored
Support SASL based data transfer encryption (#92)
1 parent 6f6c3c0 commit 5542b15

File tree

3 files changed

+192
-82
lines changed

3 files changed

+192
-82
lines changed

crates/hdfs-native/minidfs/src/main/java/main/Main.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,24 @@ public static void main(String args[]) throws Exception {
5555
conf.set(HADOOP_SECURITY_AUTHORIZATION, "true");
5656
if (flags.contains("privacy")) {
5757
conf.set(HADOOP_RPC_PROTECTION, "privacy");
58+
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
59+
if (flags.contains("data_transfer_encryption")) {
60+
conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
61+
conf.set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, "AES/CTR/NoPadding");
62+
}
5863
} else if (flags.contains("integrity")) {
5964
conf.set(HADOOP_RPC_PROTECTION, "integrity");
65+
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity");
6066
} else {
6167
conf.set(HADOOP_RPC_PROTECTION, "authentication");
68+
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
6269
}
6370
conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, "target/test/hdfs.keytab");
6471
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "hdfs/localhost@" + kdc.getRealm());
6572
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, "target/test/hdfs.keytab");
6673
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "hdfs/localhost@" + kdc.getRealm());
6774
conf.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
6875
conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
69-
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
7076
}
7177

7278
HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);

crates/hdfs-native/src/hdfs/connection.rs

+34-75
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@ use log::{debug, warn};
1212
use once_cell::sync::Lazy;
1313
use prost::Message;
1414
use socket2::SockRef;
15-
use tokio::io::BufStream;
1615
use tokio::sync::{mpsc, oneshot};
1716
use tokio::{
18-
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
19-
net::{
20-
tcp::{OwnedReadHalf, OwnedWriteHalf},
21-
TcpStream,
22-
},
17+
io::AsyncWriteExt,
18+
net::TcpStream,
2319
task::{self, JoinHandle},
2420
};
2521
use uuid::Uuid;
@@ -28,7 +24,7 @@ use crate::proto::common::rpc_response_header_proto::RpcStatusProto;
2824
use crate::proto::common::TokenProto;
2925
use crate::proto::hdfs::DatanodeIdProto;
3026
use crate::proto::{common, hdfs};
31-
use crate::security::sasl::SaslDatanodeConnection;
27+
use crate::security::sasl::{SaslDatanodeConnection, SaslDatanodeReader, SaslDatanodeWriter};
3228
use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter};
3329
use crate::security::user::UserInfo;
3430
use crate::{HdfsError, Result};
@@ -182,7 +178,7 @@ impl RpcConnection {
182178
fn start_sender(&mut self, mut rx: mpsc::Receiver<Vec<u8>>, mut writer: SaslWriter) {
183179
task::spawn(async move {
184180
while let Some(msg) = rx.recv().await {
185-
match writer.write(&msg).await {
181+
match writer.write_all(&msg).await {
186182
Ok(_) => (),
187183
Err(_) => break,
188184
}
@@ -515,10 +511,10 @@ impl Packet {
515511
}
516512
}
517513

518-
#[derive(Debug)]
519514
pub(crate) struct DatanodeConnection {
520515
client_name: String,
521-
stream: BufStream<TcpStream>,
516+
reader: SaslDatanodeReader,
517+
writer: SaslDatanodeWriter,
522518
url: String,
523519
}
524520

@@ -528,18 +524,13 @@ impl DatanodeConnection {
528524
let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
529525
let stream = connect(&url).await?;
530526

531-
// If the token has an identifier, we can do SASL negotation
532-
let stream = if token.identifier.is_empty() {
533-
stream
534-
} else {
535-
debug!("{:?}", token);
536-
let sasl_connection = SaslDatanodeConnection::create(stream);
537-
sasl_connection.negotiate(datanode_id, token).await?
538-
};
527+
let sasl_connection = SaslDatanodeConnection::create(stream);
528+
let (reader, writer) = sasl_connection.negotiate(datanode_id, token).await?;
539529

540530
let conn = DatanodeConnection {
541531
client_name: Uuid::new_v4().to_string(),
542-
stream: BufStream::new(stream),
532+
reader,
533+
writer,
543534
url: url.to_string(),
544535
};
545536
Ok(conn)
@@ -550,26 +541,21 @@ impl DatanodeConnection {
550541
op: Op,
551542
message: &impl Message,
552543
) -> Result<hdfs::BlockOpResponseProto> {
553-
self.stream
544+
self.writer
554545
.write_all(&DATA_TRANSFER_VERSION.to_be_bytes())
555546
.await?;
556-
self.stream.write_all(&[op.value()]).await?;
557-
self.stream
547+
self.writer.write_all(&[op.value()]).await?;
548+
self.writer
558549
.write_all(&message.encode_length_delimited_to_vec())
559550
.await?;
560-
self.stream.flush().await?;
551+
self.writer.flush().await?;
561552

562-
let buf = self.stream.fill_buf().await?;
563-
if buf.is_empty() {
564-
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?;
565-
}
566-
let msg_length = prost::decode_length_delimiter(buf)?;
567-
let total_size = msg_length + prost::length_delimiter_len(msg_length);
553+
let msg_length = self.reader.read_length_delimiter().await?;
568554

569-
let mut response_buf = BytesMut::zeroed(total_size);
570-
self.stream.read_exact(&mut response_buf).await?;
555+
let mut response_buf = BytesMut::zeroed(msg_length);
556+
self.reader.read_exact(&mut response_buf).await?;
571557

572-
let response = hdfs::BlockOpResponseProto::decode_length_delimited(response_buf.freeze())?;
558+
let response = hdfs::BlockOpResponseProto::decode(response_buf.freeze())?;
573559
Ok(response)
574560
}
575561

@@ -593,14 +579,14 @@ impl DatanodeConnection {
593579
pub(crate) async fn read_packet(&mut self) -> Result<Packet> {
594580
let mut payload_len_buf = [0u8; 4];
595581
let mut header_len_buf = [0u8; 2];
596-
self.stream.read_exact(&mut payload_len_buf).await?;
597-
self.stream.read_exact(&mut header_len_buf).await?;
582+
self.reader.read_exact(&mut payload_len_buf).await?;
583+
self.reader.read_exact(&mut header_len_buf).await?;
598584

599585
let payload_length = u32::from_be_bytes(payload_len_buf) as usize;
600586
let header_length = u16::from_be_bytes(header_len_buf) as usize;
601587

602588
let mut remaining_buf = BytesMut::zeroed(payload_length - 4 + header_length);
603-
self.stream.read_exact(&mut remaining_buf).await?;
589+
self.reader.read_exact(&mut remaining_buf).await?;
604590

605591
let header =
606592
hdfs::PacketHeaderProto::decode(remaining_buf.split_to(header_length).freeze())?;
@@ -617,75 +603,46 @@ impl DatanodeConnection {
617603
status: hdfs::Status::ChecksumOk as i32,
618604
};
619605

620-
self.stream
606+
self.writer
621607
.write_all(&client_read_status.encode_length_delimited_to_vec())
622608
.await?;
623-
self.stream.flush().await?;
609+
self.writer.flush().await?;
624610

625611
Ok(())
626612
}
627613

628614
pub(crate) fn split(self) -> (DatanodeReader, DatanodeWriter) {
629-
let (reader, writer) = self.stream.into_inner().into_split();
630615
let reader = DatanodeReader {
631-
client_name: self.client_name.clone(),
632-
reader: BufReader::new(reader),
633-
url: self.url,
616+
reader: self.reader,
634617
};
635618
let writer = DatanodeWriter {
636-
client_name: self.client_name,
637-
writer,
619+
writer: self.writer,
638620
};
639621
(reader, writer)
640622
}
641-
642-
// For future use where we cache datanode connections
643-
#[allow(dead_code)]
644-
pub(crate) fn reunite(reader: DatanodeReader, writer: DatanodeWriter) -> Self {
645-
assert_eq!(reader.client_name, writer.client_name);
646-
let stream = BufStream::new(reader.reader.into_inner().reunite(writer.writer).unwrap());
647-
Self {
648-
client_name: reader.client_name,
649-
stream,
650-
url: reader.url,
651-
}
652-
}
653623
}
654624

655625
/// A reader half of a Datanode connection used for reading acks during
656626
/// write operations.
657627
pub(crate) struct DatanodeReader {
658-
client_name: String,
659-
reader: BufReader<OwnedReadHalf>,
660-
url: String,
628+
reader: SaslDatanodeReader,
661629
}
662630

663631
impl DatanodeReader {
664632
pub(crate) async fn read_ack(&mut self) -> Result<hdfs::PipelineAckProto> {
665-
let buf = self.reader.fill_buf().await?;
633+
let ack_length = self.reader.read_length_delimiter().await?;
666634

667-
if buf.is_empty() {
668-
// The stream has been closed
669-
return Err(HdfsError::DataTransferError(
670-
"Datanode connection closed while waiting for ack".to_string(),
671-
));
672-
}
673-
674-
let ack_length = prost::decode_length_delimiter(buf)?;
675-
let total_size = ack_length + prost::length_delimiter_len(ack_length);
676-
677-
let mut response_buf = BytesMut::zeroed(total_size);
635+
let mut response_buf = BytesMut::zeroed(ack_length);
678636
self.reader.read_exact(&mut response_buf).await?;
679637

680-
let response = hdfs::PipelineAckProto::decode_length_delimited(response_buf.freeze())?;
638+
let response = hdfs::PipelineAckProto::decode(response_buf.freeze())?;
681639
Ok(response)
682640
}
683641
}
684642

685643
/// A write half of a Datanode connection used for writing packets.
686644
pub(crate) struct DatanodeWriter {
687-
client_name: String,
688-
writer: OwnedWriteHalf,
645+
writer: SaslDatanodeWriter,
689646
}
690647

691648
impl DatanodeWriter {
@@ -696,8 +653,10 @@ impl DatanodeWriter {
696653
let payload_len = (checksum.len() + data.len() + 4) as u32;
697654
let header_encoded = header.encode_to_vec();
698655

699-
self.writer.write_u32(payload_len).await?;
700-
self.writer.write_u16(header_encoded.len() as u16).await?;
656+
self.writer.write_all(&payload_len.to_be_bytes()).await?;
657+
self.writer
658+
.write_all(&(header_encoded.len() as u16).to_be_bytes())
659+
.await?;
701660
self.writer.write_all(&header.encode_to_vec()).await?;
702661
self.writer.write_all(&checksum).await?;
703662
self.writer.write_all(&data).await?;

0 commit comments

Comments
 (0)