Skip to content

Commit 8e28d8e

Browse files
authored
Add support for datanode sasl negotation (#81)
* Add support for datanode sasl negotation
1 parent cf2c49e commit 8e28d8e

File tree

13 files changed

+380
-86
lines changed

13 files changed

+380
-86
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Here is a list of currently supported and unsupported but possible future featur
2525
- [x] Kerberos authentication (GSSAPI SASL support)
2626
- [x] Token authentication (DIGEST-MD5 SASL support, no encryption support)
2727
- [x] NameNode SASL connection
28-
- [ ] DataNode SASL connection
28+
- [x] DataNode SASL connection
2929
- [ ] DataNode data transfer encryption
3030
- [ ] Encryption at rest (KMS support)
3131

crates/hdfs-native/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ libc = "0.2"
2222
libgssapi = { version = "0.7", default-features = false, optional = true }
2323
log = "0.4"
2424
num-traits = "0.2"
25-
once_cell = "1.19.0"
25+
once_cell = "1"
2626
prost = "0.12"
2727
prost-types = "0.12"
2828
roxmltree = "0.18"

crates/hdfs-native/minidfs/src/main/java/main/Main.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public static void main(String args[]) throws Exception {
4141
}
4242
MiniKdc kdc = null;
4343

44+
// If an existing token exists, make sure to delete it
45+
new File("target/test/delegation_token").delete();
46+
4447
Configuration conf = new Configuration();
4548
if (flags.contains("security")) {
4649
kdc = new MiniKdc(MiniKdc.createConf(), new File("target/test/kdc"));
@@ -60,8 +63,10 @@ public static void main(String args[]) throws Exception {
6063
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, "target/test/hdfs.keytab");
6164
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "hdfs/localhost@" + kdc.getRealm());
6265
conf.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
63-
// conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
6466
conf.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
67+
if (flags.contains("data_transfer_security")) {
68+
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
69+
}
6570
}
6671

6772
HdfsConfiguration hdfsConf = new HdfsConfiguration(conf);
@@ -155,8 +160,6 @@ public static void main(String args[]) throws Exception {
155160
DataOutputStream os = new DataOutputStream(new FileOutputStream("target/test/delegation_token"));
156161
creds.writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
157162
os.close();
158-
} else {
159-
new File("target/test/delegation_token").delete();
160163
}
161164
}
162165

crates/hdfs-native/src/hdfs/block_reader.rs

+6-14
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ pub(crate) fn get_block_stream(
3737

3838
/// Connects to a DataNode to do a read, attempting to used cached connections.
3939
async fn connect_and_send(
40-
url: &str,
40+
datanode_id: &hdfs::DatanodeIdProto,
4141
block: &hdfs::ExtendedBlockProto,
4242
token: common::TokenProto,
4343
offset: u64,
4444
len: u64,
4545
) -> Result<(DatanodeConnection, BlockOpResponseProto)> {
4646
let mut remaining_attempts = 2;
4747
while remaining_attempts > 0 {
48-
if let Some(mut conn) = DATANODE_CACHE.get(url) {
48+
if let Some(mut conn) = DATANODE_CACHE.get(datanode_id) {
4949
let message = hdfs::OpReadBlockProto {
5050
header: conn.build_header(block, Some(token.clone())),
5151
offset,
@@ -68,7 +68,7 @@ async fn connect_and_send(
6868
}
6969
remaining_attempts -= 1;
7070
}
71-
let mut conn = DatanodeConnection::connect(url).await?;
71+
let mut conn = DatanodeConnection::connect(datanode_id, &token).await?;
7272

7373
let message = hdfs::OpReadBlockProto {
7474
header: conn.build_header(block, Some(token)),
@@ -117,10 +117,9 @@ impl ReplicatedBlockStream {
117117
}
118118

119119
let datanode = &self.block.locs[self.current_replica].id;
120-
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
121120

122121
let (connection, response) = connect_and_send(
123-
&datanode_url,
122+
datanode,
124123
&self.block.b,
125124
self.block.block_token.clone(),
126125
self.offset as u64,
@@ -389,15 +388,8 @@ impl StripedBlockStream {
389388
return Ok(());
390389
}
391390

392-
let datanode_url = format!("{}:{}", datanode.ip_addr, datanode.xfer_port);
393-
let (mut connection, response) = connect_and_send(
394-
&datanode_url,
395-
block,
396-
token.clone(),
397-
offset as u64,
398-
len as u64,
399-
)
400-
.await?;
391+
let (mut connection, response) =
392+
connect_and_send(datanode, block, token.clone(), offset as u64, len as u64).await?;
401393

402394
if response.status() != hdfs::Status::Success {
403395
return Err(HdfsError::DataTransferError(response.message().to_string()));

crates/hdfs-native/src/hdfs/block_writer.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,7 @@ impl ReplicatedBlockWriter {
100100
server_defaults: hdfs::FsServerDefaultsProto,
101101
) -> Result<Self> {
102102
let datanode = &block.locs[0].id;
103-
let mut connection =
104-
DatanodeConnection::connect(&format!("{}:{}", datanode.ip_addr, datanode.xfer_port))
105-
.await?;
103+
let mut connection = DatanodeConnection::connect(datanode, &block.block_token).await?;
106104

107105
let checksum = hdfs::ChecksumProto {
108106
r#type: hdfs::ChecksumTypeProto::ChecksumCrc32c as i32,

crates/hdfs-native/src/hdfs/connection.rs

+23-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ use tokio::{
2525
use uuid::Uuid;
2626

2727
use crate::proto::common::rpc_response_header_proto::RpcStatusProto;
28+
use crate::proto::common::TokenProto;
29+
use crate::proto::hdfs::DatanodeIdProto;
2830
use crate::proto::{common, hdfs};
2931
use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter};
3032
use crate::security::user::UserInfo;
3133
use crate::{HdfsError, Result};
3234

35+
#[cfg(feature = "token")]
36+
use crate::security::sasl::SaslDatanodeConnection;
37+
3338
const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
3439
const DATA_TRANSFER_VERSION: u16 = 28;
3540
const MAX_PACKET_HEADER_SIZE: usize = 33;
@@ -520,12 +525,24 @@ pub(crate) struct DatanodeConnection {
520525
}
521526

522527
impl DatanodeConnection {
523-
pub(crate) async fn connect(url: &str) -> Result<Self> {
524-
let stream = BufStream::new(connect(url).await?);
528+
#[allow(unused_variables)]
529+
pub(crate) async fn connect(datanode_id: &DatanodeIdProto, token: &TokenProto) -> Result<Self> {
530+
let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
531+
let stream = connect(&url).await?;
532+
533+
// If the token has an identifier, we can do SASL negotation
534+
#[cfg(feature = "token")]
535+
let stream = if token.identifier.is_empty() {
536+
stream
537+
} else {
538+
debug!("{:?}", token);
539+
let sasl_connection = SaslDatanodeConnection::create(stream);
540+
sasl_connection.negotiate(datanode_id, token).await?
541+
};
525542

526543
let conn = DatanodeConnection {
527544
client_name: Uuid::new_v4().to_string(),
528-
stream,
545+
stream: BufStream::new(stream),
529546
url: url.to_string(),
530547
};
531548
Ok(conn)
@@ -704,15 +721,16 @@ impl DatanodeConnectionCache {
704721
}
705722
}
706723

707-
pub(crate) fn get(&self, url: &str) -> Option<DatanodeConnection> {
724+
pub(crate) fn get(&self, datanode_id: &hdfs::DatanodeIdProto) -> Option<DatanodeConnection> {
708725
// Keep things simply and just expire cache entries when checking the cache. We could
709726
// move this to its own task but that will add a little more complexity.
710727
self.remove_expired();
711728

729+
let url = format!("{}:{}", datanode_id.ip_addr, datanode_id.xfer_port);
712730
let mut cache = self.cache.lock().unwrap();
713731

714732
cache
715-
.get_mut(url)
733+
.get_mut(&url)
716734
.iter_mut()
717735
.flat_map(|conns| conns.pop_front())
718736
.map(|(_, conn)| conn)

crates/hdfs-native/src/minidfs.rs

+22-14
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use which::which;
99

1010
#[derive(PartialEq, Eq, Hash, Debug)]
1111
pub enum DfsFeatures {
12-
SECURITY,
13-
TOKEN,
14-
PRIVACY,
12+
Security,
13+
DataTransferSecurity,
14+
Token,
15+
Privacy,
1516
HA,
16-
VIEWFS,
17+
ViewFS,
1718
EC,
1819
RBF,
1920
}
@@ -23,10 +24,11 @@ impl DfsFeatures {
2324
match self {
2425
DfsFeatures::EC => "ec",
2526
DfsFeatures::HA => "ha",
26-
DfsFeatures::VIEWFS => "viewfs",
27-
DfsFeatures::PRIVACY => "privacy",
28-
DfsFeatures::SECURITY => "security",
29-
DfsFeatures::TOKEN => "token",
27+
DfsFeatures::ViewFS => "viewfs",
28+
DfsFeatures::Privacy => "privacy",
29+
DfsFeatures::Security => "security",
30+
DfsFeatures::DataTransferSecurity => "data_transfer_security",
31+
DfsFeatures::Token => "token",
3032
DfsFeatures::RBF => "rbf",
3133
}
3234
}
@@ -35,9 +37,9 @@ impl DfsFeatures {
3537
match value {
3638
"ec" => Some(DfsFeatures::EC),
3739
"ha" => Some(DfsFeatures::HA),
38-
"privacy" => Some(DfsFeatures::PRIVACY),
39-
"security" => Some(DfsFeatures::SECURITY),
40-
"token" => Some(DfsFeatures::TOKEN),
40+
"privacy" => Some(DfsFeatures::Privacy),
41+
"security" => Some(DfsFeatures::Security),
42+
"token" => Some(DfsFeatures::Token),
4143
_ => None,
4244
}
4345
}
@@ -56,6 +58,12 @@ impl MiniDfs {
5658
for feature in features.iter() {
5759
feature_args.push(feature.as_str());
5860
}
61+
// If the `token` feature is enabled, we need to force the data transfer protection
62+
#[cfg(feature = "token")]
63+
if !features.contains(&DfsFeatures::DataTransferSecurity) {
64+
feature_args.push(DfsFeatures::DataTransferSecurity.as_str());
65+
}
66+
5967
let mut child = Command::new(mvn_exec)
6068
.args([
6169
"-f",
@@ -86,7 +94,7 @@ impl MiniDfs {
8694
// Make sure this doesn't care over from a token test to a non-token test
8795
env::remove_var("HADOOP_TOKEN_FILE_LOCATION");
8896

89-
if features.contains(&DfsFeatures::SECURITY) {
97+
if features.contains(&DfsFeatures::Security) {
9098
let krb_conf = output.next().unwrap().unwrap();
9199
let kdestroy_exec = which("kdestroy").expect("Failed to find kdestroy executable");
92100
Command::new(kdestroy_exec).spawn().unwrap().wait().unwrap();
@@ -106,7 +114,7 @@ impl MiniDfs {
106114
);
107115

108116
// If we testing token auth, set the path to the file and make sure we don't have an old kinit, otherwise kinit
109-
if features.contains(&DfsFeatures::TOKEN) {
117+
if features.contains(&DfsFeatures::Token) {
110118
env::set_var("HADOOP_TOKEN_FILE_LOCATION", "target/test/delegation_token");
111119
} else {
112120
let kinit_exec = which("kinit").expect("Failed to find kinit executable");
@@ -120,7 +128,7 @@ impl MiniDfs {
120128
}
121129
}
122130

123-
let url = if features.contains(&DfsFeatures::VIEWFS) {
131+
let url = if features.contains(&DfsFeatures::ViewFS) {
124132
"viewfs://minidfs-viewfs"
125133
} else if features.contains(&DfsFeatures::RBF) {
126134
"hdfs://fed"

0 commit comments

Comments
 (0)