Skip to content

Commit c7d40bd

Browse files
authored
Filter out expired tokens (#163)
1 parent 99b689e commit c7d40bd

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

rust/src/security/sasl.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl SaslReader {
252252

253253
let mut bytes = buf.freeze();
254254
let rpc_response = RpcResponseHeaderProto::decode_length_delimited(&mut bytes)?;
255-
debug!("{:?}", rpc_response);
255+
debug!("RPC response: {:?}", rpc_response);
256256

257257
match RpcStatusProto::try_from(rpc_response.status).unwrap() {
258258
RpcStatusProto::Error => {
@@ -339,6 +339,8 @@ impl SaslWriter {
339339
}
340340

341341
async fn send_sasl_message(&mut self, message: &RpcSaslProto) -> io::Result<()> {
342+
debug!("Sending SASL message {:?}", message);
343+
342344
let header_buf = Self::create_request_header().encode_length_delimited_to_vec();
343345
let message_buf = message.encode_length_delimited_to_vec();
344346
let size = (header_buf.len() + message_buf.len()) as u32;

rust/src/security/user.rs

+58-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use bytes::{Buf, Bytes};
2+
use chrono::Utc;
23
use log::debug;
34
use prost::Message;
45
use std::env;
@@ -13,6 +14,7 @@ use crate::proto::common::TokenProto;
1314
use crate::proto::hdfs::AccessModeProto;
1415
use crate::proto::hdfs::BlockTokenSecretProto;
1516
use crate::proto::hdfs::StorageTypeProto;
17+
use crate::HdfsError;
1618
use crate::Result;
1719

1820
const HADOOP_USER_NAME: &str = "HADOOP_USER_NAME";
@@ -123,6 +125,48 @@ impl BlockTokenIdentifier {
123125
}
124126
}
125127

128+
#[derive(Debug)]
129+
#[allow(dead_code)]
130+
struct TokenIdentifier {
131+
owner: String,
132+
renewer: String,
133+
real_user: String,
134+
issue_date: i64,
135+
max_date: i64,
136+
sequence_number: i32,
137+
master_key_id: i32,
138+
}
139+
140+
impl TryFrom<Vec<u8>> for TokenIdentifier {
141+
type Error = HdfsError;
142+
143+
fn try_from(value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
144+
let mut buf = Bytes::from(value);
145+
let version = buf.get_u8();
146+
if version != 0 {
147+
panic!();
148+
}
149+
150+
let owner = parse_vint_string(&mut buf)?;
151+
let renewer = parse_vint_string(&mut buf)?;
152+
let real_user = parse_vint_string(&mut buf)?;
153+
let issue_date = parse_vlong(&mut buf);
154+
let max_date = parse_vlong(&mut buf);
155+
let sequence_number = parse_vint(&mut buf);
156+
let master_key_id = parse_vint(&mut buf);
157+
158+
Ok(TokenIdentifier {
159+
owner,
160+
renewer,
161+
real_user,
162+
issue_date,
163+
max_date,
164+
sequence_number,
165+
master_key_id,
166+
})
167+
}
168+
}
169+
126170
#[derive(Debug)]
127171
#[allow(dead_code)]
128172
pub struct Token {
@@ -320,7 +364,13 @@ impl User {
320364
pub(crate) fn get_token(&self, kind: &str, service: &str) -> Option<&Token> {
321365
self.tokens
322366
.iter()
323-
.find(|t| t.kind == kind && t.service == service)
367+
.filter(|t| t.kind == kind && t.service == service)
368+
.find(|t| {
369+
// Ignore any tokens that are set to expire in the next 60 seconds
370+
let token_identifier: TokenIdentifier = t.identifier.clone().try_into().unwrap();
371+
debug!("Token Identifier: {:?}", token_identifier);
372+
token_identifier.max_date > Utc::now().timestamp_millis() + 60000
373+
})
324374
}
325375

326376
pub(crate) fn get_user_info_from_principal(principal: &str) -> UserInfo {
@@ -385,7 +435,9 @@ mod tests {
385435
assert_eq!(tokens.len(), 1);
386436
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
387437
assert_eq!(tokens[0].service, "127.0.0.1:9000");
388-
tokens.iter().for_each(|t| println!("{:?}", t));
438+
439+
let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap();
440+
assert_eq!(token_identifier.max_date, 1690672432660)
389441
}
390442

391443
#[test]
@@ -413,11 +465,13 @@ mod tests {
413465
assert_eq!(tokens.len(), 1);
414466
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
415467
assert_eq!(tokens[0].service, "127.0.0.1:9000");
416-
tokens.iter().for_each(|t| println!("{:?}", t));
468+
469+
let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap();
470+
assert_eq!(token_identifier.max_date, 1686955057021)
417471
}
418472

419473
#[test]
420-
fn test_load_token_identifier() {
474+
fn test_load_block_token_identifier() {
421475
let token = [
422476
138u8, 1, 142, 89, 190, 30, 189, 140, 100, 197, 210, 104, 0, 0, 0, 4, 104, 100, 102,
423477
115, 0, 0, 0, 40, 66, 80, 45, 57, 55, 51, 52, 55, 55, 51, 54, 48, 45, 49, 57, 50, 46,
@@ -433,7 +487,6 @@ mod tests {
433487
];
434488

435489
let token_identifier = BlockTokenIdentifier::from_identifier(&token).unwrap();
436-
println!("{:?}", token_identifier);
437490
assert_eq!(token_identifier.user_id, "hdfs");
438491
assert_eq!(
439492
token_identifier.block_pool_id,

0 commit comments

Comments
 (0)