1
1
use bytes:: { Buf , Bytes } ;
2
+ use chrono:: Utc ;
2
3
use log:: debug;
3
4
use prost:: Message ;
4
5
use std:: env;
@@ -13,6 +14,7 @@ use crate::proto::common::TokenProto;
13
14
use crate :: proto:: hdfs:: AccessModeProto ;
14
15
use crate :: proto:: hdfs:: BlockTokenSecretProto ;
15
16
use crate :: proto:: hdfs:: StorageTypeProto ;
17
+ use crate :: HdfsError ;
16
18
use crate :: Result ;
17
19
18
20
const HADOOP_USER_NAME : & str = "HADOOP_USER_NAME" ;
@@ -123,6 +125,48 @@ impl BlockTokenIdentifier {
123
125
}
124
126
}
125
127
128
+ #[ derive( Debug ) ]
129
+ #[ allow( dead_code) ]
130
+ struct TokenIdentifier {
131
+ owner : String ,
132
+ renewer : String ,
133
+ real_user : String ,
134
+ issue_date : i64 ,
135
+ max_date : i64 ,
136
+ sequence_number : i32 ,
137
+ master_key_id : i32 ,
138
+ }
139
+
140
+ impl TryFrom < Vec < u8 > > for TokenIdentifier {
141
+ type Error = HdfsError ;
142
+
143
+ fn try_from ( value : Vec < u8 > ) -> std:: result:: Result < Self , Self :: Error > {
144
+ let mut buf = Bytes :: from ( value) ;
145
+ let version = buf. get_u8 ( ) ;
146
+ if version != 0 {
147
+ panic ! ( ) ;
148
+ }
149
+
150
+ let owner = parse_vint_string ( & mut buf) ?;
151
+ let renewer = parse_vint_string ( & mut buf) ?;
152
+ let real_user = parse_vint_string ( & mut buf) ?;
153
+ let issue_date = parse_vlong ( & mut buf) ;
154
+ let max_date = parse_vlong ( & mut buf) ;
155
+ let sequence_number = parse_vint ( & mut buf) ;
156
+ let master_key_id = parse_vint ( & mut buf) ;
157
+
158
+ Ok ( TokenIdentifier {
159
+ owner,
160
+ renewer,
161
+ real_user,
162
+ issue_date,
163
+ max_date,
164
+ sequence_number,
165
+ master_key_id,
166
+ } )
167
+ }
168
+ }
169
+
126
170
#[ derive( Debug ) ]
127
171
#[ allow( dead_code) ]
128
172
pub struct Token {
@@ -320,7 +364,13 @@ impl User {
320
364
pub ( crate ) fn get_token ( & self , kind : & str , service : & str ) -> Option < & Token > {
321
365
self . tokens
322
366
. iter ( )
323
- . find ( |t| t. kind == kind && t. service == service)
367
+ . filter ( |t| t. kind == kind && t. service == service)
368
+ . find ( |t| {
369
+ // Ignore any tokens that are set to expire in the next 60 seconds
370
+ let token_identifier: TokenIdentifier = t. identifier . clone ( ) . try_into ( ) . unwrap ( ) ;
371
+ debug ! ( "Token Identifier: {:?}" , token_identifier) ;
372
+ token_identifier. max_date > Utc :: now ( ) . timestamp_millis ( ) + 60000
373
+ } )
324
374
}
325
375
326
376
pub ( crate ) fn get_user_info_from_principal ( principal : & str ) -> UserInfo {
@@ -385,7 +435,9 @@ mod tests {
385
435
assert_eq ! ( tokens. len( ) , 1 ) ;
386
436
assert_eq ! ( tokens[ 0 ] . kind, "HDFS_DELEGATION_TOKEN" ) ;
387
437
assert_eq ! ( tokens[ 0 ] . service, "127.0.0.1:9000" ) ;
388
- tokens. iter ( ) . for_each ( |t| println ! ( "{:?}" , t) ) ;
438
+
439
+ let token_identifier: TokenIdentifier = tokens[ 0 ] . identifier . clone ( ) . try_into ( ) . unwrap ( ) ;
440
+ assert_eq ! ( token_identifier. max_date, 1690672432660 )
389
441
}
390
442
391
443
#[ test]
@@ -413,11 +465,13 @@ mod tests {
413
465
assert_eq ! ( tokens. len( ) , 1 ) ;
414
466
assert_eq ! ( tokens[ 0 ] . kind, "HDFS_DELEGATION_TOKEN" ) ;
415
467
assert_eq ! ( tokens[ 0 ] . service, "127.0.0.1:9000" ) ;
416
- tokens. iter ( ) . for_each ( |t| println ! ( "{:?}" , t) ) ;
468
+
469
+ let token_identifier: TokenIdentifier = tokens[ 0 ] . identifier . clone ( ) . try_into ( ) . unwrap ( ) ;
470
+ assert_eq ! ( token_identifier. max_date, 1686955057021 )
417
471
}
418
472
419
473
#[ test]
420
- fn test_load_token_identifier ( ) {
474
+ fn test_load_block_token_identifier ( ) {
421
475
let token = [
422
476
138u8 , 1 , 142 , 89 , 190 , 30 , 189 , 140 , 100 , 197 , 210 , 104 , 0 , 0 , 0 , 4 , 104 , 100 , 102 ,
423
477
115 , 0 , 0 , 0 , 40 , 66 , 80 , 45 , 57 , 55 , 51 , 52 , 55 , 55 , 51 , 54 , 48 , 45 , 49 , 57 , 50 , 46 ,
@@ -433,7 +487,6 @@ mod tests {
433
487
] ;
434
488
435
489
let token_identifier = BlockTokenIdentifier :: from_identifier ( & token) . unwrap ( ) ;
436
- println ! ( "{:?}" , token_identifier) ;
437
490
assert_eq ! ( token_identifier. user_id, "hdfs" ) ;
438
491
assert_eq ! (
439
492
token_identifier. block_pool_id,
0 commit comments