Skip to content

Commit 5e97c6c

Browse files
committed
Add support for datanode sasl negotation
1 parent cf2c49e commit 5e97c6c

File tree

7 files changed

+312
-49
lines changed

7 files changed

+312
-49
lines changed

crates/hdfs-native/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ libc = "0.2"
2222
libgssapi = { version = "0.7", default-features = false, optional = true }
2323
log = "0.4"
2424
num-traits = "0.2"
25-
once_cell = "1.19.0"
25+
once_cell = "1"
2626
prost = "0.12"
2727
prost-types = "0.12"
2828
roxmltree = "0.18"

crates/hdfs-native/minidfs/src/main/java/main/Main.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ public static void main(String args[]) throws Exception {
6060
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, "target/test/hdfs.keytab");
6161
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "hdfs/localhost@" + kdc.getRealm());
6262
conf.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
63-
// conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
6463
conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
64+
if (flags.contains("token")) {
65+
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
66+
}
6567
}
6668

6769
HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);

crates/hdfs-native/src/hdfs/block_reader.rs

+6-14
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ pub(crate) fn get_block_stream(
3737

3838
/// Connects to a DataNode to do a read, attempting to used cached connections.
3939
async fn connect_and_send(
40-
url: &str,
40+
datanode_id: &hdfs::DatanodeIdProto,
4141
block: &hdfs::ExtendedBlockProto,
4242
token: common::TokenProto,
4343
offset: u64,
4444
len: u64,
4545
) -> Result<(DatanodeConnection, BlockOpResponseProto)> {
4646
let mut remaining_attempts = 2;
4747
while remaining_attempts > 0 {
48-
if let Some(mut conn) = DATANODE_CACHE.get(url) {
48+
if let Some(mut conn) = DATANODE_CACHE.get(datanode_id) {
4949
let message = hdfs::OpReadBlockProto {
5050
header: conn.build_header(block, Some(token.clone())),
5151
offset,
@@ -68,7 +68,7 @@ async fn connect_and_send(
6868
}
6969
remaining_attempts -= 1;
7070
}
71-
let mut conn = DatanodeConnection::connect(url).await?;
71+
let mut conn = DatanodeConnection::connect(datanode_id, &token).await?;
7272

7373
let message = hdfs::OpReadBlockProto {
7474
header: conn.build_header(block, Some(token)),
@@ -117,10 +117,9 @@ impl ReplicatedBlockStream {
117117
}
118118

119119
let datanode = &self.block.locs[self.current_replica].id;
120-
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
121120

122121
let (connection, response) = connect_and_send(
123-
&datanode_url,
122+
datanode,
124123
&self.block.b,
125124
self.block.block_token.clone(),
126125
self.offset as u64,
@@ -389,15 +388,8 @@ impl StripedBlockStream {
389388
return Ok(());
390389
}
391390

392-
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
393-
let (mut connection, response) = connect_and_send(
394-
&datanode_url,
395-
block,
396-
token.clone(),
397-
offset as u64,
398-
len as u64,
399-
)
400-
.await?;
391+
let (mut connection, response) =
392+
connect_and_send(datanode, block, token.clone(), offset as u64, len as u64).await?;
401393

402394
if response.status() != hdfs::Status::Success {
403395
return Err(HdfsError::DataTransferError(response.message().to_string()));

crates/hdfs-native/src/hdfs/block_writer.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,7 @@ impl ReplicatedBlockWriter {
100100
server_defaults: hdfs::FsServerDefaultsProto,
101101
) -> Result<Self> {
102102
let datanode = &block.locs[0].id;
103-
let mut connection =
104-
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
105-
.await?;
103+
let mut connection = DatanodeConnection::connect(datanode, &block.block_token).await?;
106104

107105
let checksum = hdfs::ChecksumProto {
108106
r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32,

crates/hdfs-native/src/hdfs/connection.rs

+23-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ use tokio::{
2525
use uuid::Uuid;
2626

2727
use crate::proto::common::rpc_response_header_proto::RpcStatusProto;
28+
use crate::proto::common::TokenProto;
29+
use crate::proto::hdfs::DatanodeIdProto;
2830
use crate::proto::{common, hdfs};
2931
use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter};
3032
use crate::security::user::UserInfo;
3133
use crate::{HdfsError, Result};
3234

35+
#[cfg(feature = "token")]
36+
use crate::security::sasl::SaslDatanodeConnection;
37+
3338
const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
3439
const DATA_TRANSFER_VERSION: u16 = 28;
3540
const MAX_PACKET_HEADER_SIZE: usize = 33;
@@ -520,12 +525,24 @@ pub(crate) struct DatanodeConnection {
520525
}
521526

522527
impl DatanodeConnection {
523-
pub(crate) async fn connect(url: &str) -> Result<Self> {
524-
let stream = BufStream::new(connect(url).await?);
528+
#[allow(unused_variables)]
529+
pub(crate) async fn connect(datanode_id: &DatanodeIdProto, token: &TokenProto) -> Result<Self> {
530+
let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
531+
let stream = connect(&url).await?;
532+
533+
// If the token has an identifier, we can do SASL negotation
534+
#[cfg(feature = "token")]
535+
let stream = if token.identifier.is_empty() {
536+
stream
537+
} else {
538+
debug!("{:?}", token);
539+
let sasl_connection = SaslDatanodeConnection::create(stream);
540+
sasl_connection.negotiate(datanode_id, token).await?
541+
};
525542

526543
let conn = DatanodeConnection {
527544
client_name: Uuid::new_v4().to_string(),
528-
stream,
545+
stream: BufStream::new(stream),
529546
url: url.to_string(),
530547
};
531548
Ok(conn)
@@ -704,15 +721,16 @@ impl DatanodeConnectionCache {
704721
}
705722
}
706723

707-
pub(crate) fn get(&self, url: &str) -> Option<DatanodeConnection> {
724+
pub(crate) fn get(&self, datanode_id: &hdfs::DatanodeIdProto) -> Option<DatanodeConnection> {
708725
// Keep things simply and just expire cache entries when checking the cache. We could
709726
// move this to its own task but that will add a little more complexity.
710727
self.remove_expired();
711728

729+
let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
712730
let mut cache = self.cache.lock().unwrap();
713731

714732
cache
715-
.get_mut(url)
733+
.get_mut(&url)
716734
.iter_mut()
717735
.flat_map(|conns| conns.pop_front())
718736
.map(|(_, conn)| conn)

crates/hdfs-native/src/security/sasl.rs

+120-12
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,38 @@ use crate::proto::common::rpc_sasl_proto::{SaslAuth, SaslState};
1515
use crate::proto::common::{
1616
RpcKindProto, RpcRequestHeaderProto, RpcResponseHeaderProto, RpcSaslProto,
1717
};
18+
1819
use crate::{HdfsError, Result};
1920
#[cfg(feature = "token")]
2021
use {
21-
super::user::Token,
22+
super::user::{BlockTokenIdentifier, Token},
2223
base64::{engine::general_purpose, Engine as _},
2324
gsasl_sys as gsasl,
2425
libc::{c_char, c_void, memcpy},
2526
std::ffi::CString,
2627
std::ptr,
2728
std::sync::atomic::AtomicPtr,
2829
};
30+
#[cfg(feature = "token")]
31+
use {
32+
crate::proto::{
33+
common::TokenProto,
34+
hdfs::{
35+
data_transfer_encryptor_message_proto::DataTransferEncryptorStatus, CipherOptionProto,
36+
CipherSuiteProto, DataTransferEncryptorMessageProto, DatanodeIdProto,
37+
HandshakeSecretProto,
38+
},
39+
},
40+
tokio::io::{AsyncBufReadExt, BufStream},
41+
};
2942

3043
#[cfg(feature = "kerberos")]
3144
use super::gssapi::GssapiSession;
3245
use super::user::{User, UserInfo};
3346

3447
const SASL_CALL_ID: i32 = -33;
48+
#[cfg(feature = "token")]
49+
const SASL_TRANSFER_MAGIC_NUMBER: i32 = 0xDEADBEEFu32 as i32;
3550
const HDFS_DELEGATION_TOKEN: &str = "HDFS_DELEGATION_TOKEN";
3651

3752
pub(crate) enum AuthMethod {
@@ -309,17 +324,6 @@ impl SaslReader {
309324
}
310325
}
311326

312-
// TODO: Can we implement this?
313-
// impl AsyncRead for SaslReader {
314-
// fn poll_read(
315-
// self: Pin<&mut Self>,
316-
// cx: &mut task::Context<'_>,
317-
// buf: &mut ReadBuf<'_>,
318-
// ) -> Poll<io::Result<()>> {
319-
// todo!()
320-
// }
321-
// }
322-
323327
pub(crate) struct SaslWriter {
324328
stream: OwnedWriteHalf,
325329
session: Option<Arc<Mutex<Box<dyn SaslSession>>>>,
@@ -547,3 +551,107 @@ impl Drop for GSASLSession {
547551
}
548552
}
549553
}
554+
555+
#[cfg(feature = "token")]
556+
pub(crate) struct SaslDatanodeConnection {
557+
stream: BufStream<TcpStream>,
558+
}
559+
560+
#[cfg(feature = "token")]
561+
impl SaslDatanodeConnection {
562+
pub fn create(stream: TcpStream) -> Self {
563+
Self {
564+
stream: BufStream::new(stream),
565+
}
566+
}
567+
568+
pub(crate) async fn negotiate(
569+
mut self,
570+
datanode_id: &DatanodeIdProto,
571+
token: &TokenProto,
572+
) -> Result<TcpStream> {
573+
// If it's a privileged port, don't do SASL negotation
574+
if datanode_id.xfer_port <= 1024 {
575+
return Ok(self.stream.into_inner());
576+
}
577+
578+
self.stream.write_i32(SASL_TRANSFER_MAGIC_NUMBER).await?;
579+
self.stream.flush().await?;
580+
581+
let mut session = GSASLSession::new("hdfs", "0", &token.clone().into())?;
582+
583+
let token_identifier = BlockTokenIdentifier::from_identifier(&token.identifier)?;
584+
585+
let handshake_secret = if !token_identifier.handshake_secret.is_empty() {
586+
Some(HandshakeSecretProto {
587+
bpid: token_identifier.block_pool_id.clone(),
588+
secret: token_identifier.handshake_secret.clone(),
589+
})
590+
} else {
591+
None
592+
};
593+
594+
let message = DataTransferEncryptorMessageProto {
595+
handshake_secret,
596+
status: DataTransferEncryptorStatus::Success as i32,
597+
..Default::default()
598+
};
599+
600+
debug!("Sending data transfer encryptor message: {:?}", message);
601+
602+
self.stream
603+
.write_all(&message.encode_length_delimited_to_vec())
604+
.await?;
605+
self.stream.flush().await?;
606+
607+
let response = self.read_sasl_response().await?;
608+
debug!("Data transfer encryptor response: {:?}", response);
609+
610+
let (payload, finished) = session.step(response.payload.as_ref().map(|p| &p[..]))?;
611+
assert!(!finished);
612+
613+
let message = DataTransferEncryptorMessageProto {
614+
status: DataTransferEncryptorStatus::Success as i32,
615+
payload: Some(payload),
616+
cipher_option: vec![CipherOptionProto {
617+
suite: CipherSuiteProto::AesCtrNopadding as i32,
618+
..Default::default()
619+
}],
620+
..Default::default()
621+
};
622+
623+
debug!("Sending data transfer encryptor message: {:?}", message);
624+
625+
self.stream
626+
.write_all(&message.encode_length_delimited_to_vec())
627+
.await?;
628+
self.stream.flush().await?;
629+
630+
let response = self.read_sasl_response().await?;
631+
debug!("Data transfer encryptor response: {:?}", response);
632+
633+
let (_, finished) = session.step(response.payload.as_ref().map(|p| &p[..]))?;
634+
635+
assert!(finished);
636+
637+
Ok(self.stream.into_inner())
638+
}
639+
640+
async fn read_sasl_response(&mut self) -> Result<DataTransferEncryptorMessageProto> {
641+
self.stream.fill_buf().await?;
642+
643+
let buf = self.stream.fill_buf().await?;
644+
if buf.is_empty() {
645+
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?;
646+
}
647+
let msg_length = prost::decode_length_delimiter(buf)?;
648+
let total_size = msg_length + prost::length_delimiter_len(msg_length);
649+
650+
let mut response_buf = BytesMut::zeroed(total_size);
651+
self.stream.read_exact(&mut response_buf).await?;
652+
653+
Ok(DataTransferEncryptorMessageProto::decode_length_delimited(
654+
response_buf.freeze(),
655+
)?)
656+
}
657+
}

0 commit comments

Comments
 (0)