Skip to content

Commit 5885635

Browse files
authored
Add integrity support to token sasl (#88)
1 parent 3562554 commit 5885635

File tree

8 files changed

+210
-31
lines changed

8 files changed

+210
-31
lines changed

Cargo.lock

+83-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ futures = "0.3"
1212
tokio = "1"
1313

1414
[profile.bench]
15-
debug = true
15+
debug = true

crates/hdfs-native/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ chrono = { workspace = true }
1717
crc = "3.1.0-beta.1"
1818
futures = { workspace = true }
1919
g2p = "1"
20+
hex = "0.4"
21+
hmac = "0.12"
2022
libc = "0.2"
2123
libgssapi = { version = "0.7", default-features = false, optional = true }
2224
log = "0.4"
23-
md5 = "0.7"
25+
md-5 = "0.10"
2426
num-traits = "0.2"
2527
once_cell = "1"
2628
prost = "0.12"

crates/hdfs-native/minidfs/src/main/java/main/Main.java

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public static void main(String args[]) throws Exception {
5555
conf.set(HADOOP_SECURITY_AUTHORIZATION, "true");
5656
if (flags.contains("privacy")) {
5757
conf.set(HADOOP_RPC_PROTECTION, "privacy");
58+
} else if (flags.contains("integrity")) {
59+
conf.set(HADOOP_RPC_PROTECTION, "integrity");
5860
} else {
5961
conf.set(HADOOP_RPC_PROTECTION, "authentication");
6062
}

crates/hdfs-native/src/minidfs.rs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use which::which;
1111
pub enum DfsFeatures {
1212
Security,
1313
Token,
14+
Integrity,
1415
Privacy,
1516
HA,
1617
ViewFS,
@@ -26,6 +27,7 @@ impl DfsFeatures {
2627
DfsFeatures::ViewFS => "viewfs",
2728
DfsFeatures::Privacy => "privacy",
2829
DfsFeatures::Security => "security",
30+
DfsFeatures::Integrity => "integrity",
2931
DfsFeatures::Token => "token",
3032
DfsFeatures::RBF => "rbf",
3133
}

crates/hdfs-native/src/security/digest.rs

+89-17
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
};
55

66
use base64::{engine::general_purpose, Engine as _};
7-
use md5::Digest;
7+
use hmac::{Hmac, Mac};
8+
use md5::{Digest, Md5};
89
use once_cell::sync::Lazy;
910
use rand::Rng;
1011
use regex::Regex;
@@ -16,6 +17,8 @@ use super::{
1617
user::{Token, UserInfo},
1718
};
1819

20+
type HmacMD5 = Hmac<Md5>;
21+
1922
static CHALLENGE_PATTERN: Lazy<Regex> =
2023
Lazy::new(|| Regex::new(r#",?([a-zA-Z0-9]+)=("([^"]+)"|([^,]+)),?"#).unwrap());
2124
static RESPONSE_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new("rspauth=([a-f0-9]{32})").unwrap());
@@ -70,7 +73,7 @@ impl Qop {
7073
}
7174
}
7275

73-
static SUPPORTED_QOPS: [Qop; 1] = [Qop::Auth];
76+
static SUPPORTED_QOPS: [Qop; 2] = [Qop::Auth, Qop::AuthInt];
7477

7578
fn choose_qop(options: Vec<Qop>) -> Result<Qop> {
7679
options
@@ -82,11 +85,13 @@ fn choose_qop(options: Vec<Qop>) -> Result<Qop> {
8285
))
8386
}
8487

85-
fn h(s: impl AsRef<[u8]>) -> Digest {
86-
md5::compute(s.as_ref())
88+
fn h(b: impl AsRef<[u8]>) -> Vec<u8> {
89+
let mut hasher = Md5::new();
90+
hasher.update(b.as_ref());
91+
hasher.finalize().to_vec()
8792
}
8893

89-
fn kd(k: impl AsRef<[u8]>, v: impl AsRef<[u8]>) -> Digest {
94+
fn kd(k: impl AsRef<[u8]>, v: impl AsRef<[u8]>) -> Vec<u8> {
9095
h([k.as_ref(), b":", v.as_ref()].concat())
9196
}
9297

@@ -119,7 +124,6 @@ impl TryFrom<Vec<u8>> for Challenge {
119124

120125
fn try_from(value: Vec<u8>) -> core::result::Result<Self, Self::Error> {
121126
let decoded = String::from_utf8(value).unwrap();
122-
println!("Parsing {}", decoded);
123127
let mut options: HashMap<String, String> = HashMap::new();
124128
for capture in CHALLENGE_PATTERN.captures_iter(&decoded) {
125129
let key = capture.get(1).unwrap().as_str().to_string();
@@ -172,10 +176,20 @@ struct DigestContext {
172176
qop: Qop,
173177
}
174178

179+
struct IntegrityContext {
180+
kic: Vec<u8>,
181+
kis: Vec<u8>,
182+
seq_num: u32,
183+
}
184+
185+
enum SecurityContext {
186+
Integrity(IntegrityContext),
187+
}
188+
175189
enum DigestState {
176190
Pending,
177191
Stepped(DigestContext),
178-
Completed(DigestContext),
192+
Completed(Option<SecurityContext>),
179193
Errored,
180194
}
181195

@@ -199,17 +213,17 @@ impl DigestSaslSession {
199213
}
200214

201215
fn compute(&self, ctx: &DigestContext, initial: bool) -> String {
202-
let x = format!("{:x}", h(self.a1(ctx)));
216+
let x = hex::encode(h(self.a1(ctx)));
203217
let y = format!(
204-
"{}:{:08x}:{}:{}:{:x}",
218+
"{}:{:08x}:{}:{}:{}",
205219
ctx.nonce,
206220
1,
207221
ctx.cnonce,
208222
ctx.qop,
209-
h(self.a2(initial, &ctx.qop))
223+
hex::encode(h(self.a2(initial, &ctx.qop)))
210224
);
211225

212-
format!("{:x}", kd(x, y))
226+
hex::encode(kd(x, y))
213227
}
214228

215229
fn a1(&self, ctx: &DigestContext) -> Vec<u8> {
@@ -294,7 +308,18 @@ impl SaslSession for DigestSaslSession {
294308
));
295309
}
296310

297-
self.state = DigestState::Completed(ctx);
311+
self.state = match ctx.qop {
312+
Qop::Auth => DigestState::Completed(None),
313+
Qop::AuthInt => {
314+
let int_ctx = IntegrityContext {
315+
kic: h([&h(self.a1(&ctx))[..], b"Digest session key to client-to-server signing key magic constant"].concat()),
316+
kis: h([&h(self.a1(&ctx))[..], b"Digest session key to server-to-client signing key magic constant"].concat()),
317+
seq_num: 0
318+
};
319+
DigestState::Completed(Some(SecurityContext::Integrity(int_ctx)))
320+
}
321+
_ => todo!(),
322+
};
298323
Ok((Vec::new(), true))
299324
}
300325
DigestState::Completed(_) => Err(HdfsError::SASLError(
@@ -308,17 +333,64 @@ impl SaslSession for DigestSaslSession {
308333

309334
fn has_security_layer(&self) -> bool {
310335
match &self.state {
311-
DigestState::Completed(ctx) => ctx.qop.has_security_layer(),
336+
DigestState::Completed(ctx) => ctx.is_some(),
312337
_ => false,
313338
}
314339
}
315340

316-
fn encode(&mut self, _buf: &[u8]) -> crate::Result<Vec<u8>> {
317-
todo!()
341+
fn encode(&mut self, buf: &[u8]) -> crate::Result<Vec<u8>> {
342+
match &mut self.state {
343+
DigestState::Completed(ctx) => match ctx {
344+
Some(SecurityContext::Integrity(int_ctx)) => {
345+
let mut mac = HmacMD5::new_from_slice(&int_ctx.kic).unwrap();
346+
mac.update(&int_ctx.seq_num.to_be_bytes());
347+
mac.update(buf);
348+
let result = mac.finalize().into_bytes();
349+
350+
let message =
351+
[buf, &result[0..10], &[0, 1], &int_ctx.seq_num.to_be_bytes()].concat();
352+
353+
int_ctx.seq_num += 1;
354+
Ok(message)
355+
}
356+
None => Err(HdfsError::SASLError(
357+
"QOP doesn't support security layer".to_string(),
358+
)),
359+
},
360+
_ => Err(HdfsError::SASLError(
361+
"SASL negotiation not complete, can't encode message".to_string(),
362+
)),
363+
}
318364
}
319365

320-
fn decode(&mut self, _buf: &[u8]) -> crate::Result<Vec<u8>> {
321-
todo!()
366+
fn decode(&mut self, buf: &[u8]) -> crate::Result<Vec<u8>> {
367+
match &self.state {
368+
DigestState::Completed(ctx) => match ctx {
369+
Some(SecurityContext::Integrity(int_ctx)) => {
370+
let message = &buf[0..buf.len() - 16];
371+
let hmac = &buf[buf.len() - 16..buf.len() - 6];
372+
let mut seq_num_bytes = [0u8; 4];
373+
seq_num_bytes.copy_from_slice(&buf[buf.len() - 4..]);
374+
let seq_num = u32::from_be_bytes(seq_num_bytes);
375+
376+
let mut mac = HmacMD5::new_from_slice(&int_ctx.kis).unwrap();
377+
mac.update(&seq_num.to_be_bytes());
378+
mac.update(message);
379+
380+
mac.verify_truncated_left(hmac).map_err(|_| {
381+
HdfsError::SASLError("Integrity HMAC check failed".to_string())
382+
})?;
383+
384+
Ok(message.to_vec())
385+
}
386+
None => Err(HdfsError::SASLError(
387+
"QOP doesn't support security layer".to_string(),
388+
)),
389+
},
390+
_ => Err(HdfsError::SASLError(
391+
"SASL negotiation not complete, can't decode message".to_string(),
392+
)),
393+
}
322394
}
323395

324396
fn get_user_info(&self) -> crate::Result<super::user::UserInfo> {

0 commit comments

Comments
 (0)