-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathminidfs.rs
151 lines (134 loc) · 4.59 KB
/
minidfs.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
use std::{
collections::HashSet,
env,
io::{BufRead, BufReader, Write},
path::PathBuf,
process::{Child, Command, Stdio},
};
use which::which;
#[derive(PartialEq, Eq, Hash, Debug)]
pub enum DfsFeatures {
TESTFILE,
SECURITY,
TOKEN,
PRIVACY,
HA,
VIEWFS,
EC,
RBF,
}
impl DfsFeatures {
pub fn as_str(&self) -> &str {
match self {
DfsFeatures::TESTFILE => "testfile",
DfsFeatures::EC => "ec",
DfsFeatures::HA => "ha",
DfsFeatures::VIEWFS => "viewfs",
DfsFeatures::PRIVACY => "privacy",
DfsFeatures::SECURITY => "security",
DfsFeatures::TOKEN => "token",
DfsFeatures::RBF => "rbf",
}
}
pub fn from(value: &str) -> Option<Self> {
match value {
"ec" => Some(DfsFeatures::EC),
"ha" => Some(DfsFeatures::HA),
"privacy" => Some(DfsFeatures::PRIVACY),
"security" => Some(DfsFeatures::SECURITY),
"token" => Some(DfsFeatures::TOKEN),
_ => None,
}
}
}
pub struct MiniDfs {
process: Child,
pub url: String,
}
impl MiniDfs {
pub fn with_features(features: &HashSet<DfsFeatures>) -> Self {
let mvn_exec = which("mvn").expect("Failed to find java executable");
let mut feature_args: Vec<&str> = Vec::new();
for feature in features.iter() {
feature_args.push(feature.as_str());
}
let mut child = Command::new(mvn_exec)
.args([
"-f",
concat!(env!("OUT_DIR"), "/minidfs"),
"--quiet",
"compile",
"exec:java",
&format!("-Dexec.args={}", feature_args.join(" ")),
])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.unwrap();
let mut output = BufReader::new(child.stdout.take().unwrap()).lines();
let ready = output.next().unwrap().unwrap();
if ready != "Ready!" {
println!("Failed to start minidfs");
println!("{}", ready);
for line in output.by_ref() {
println!("{}", line.unwrap());
}
panic!();
}
// Make sure this doesn't care over from a token test to a non-token test
env::remove_var("HADOOP_TOKEN_FILE_LOCATION");
if features.contains(&DfsFeatures::SECURITY) {
let krb_conf = output.next().unwrap().unwrap();
let kdestroy_exec = which("kdestroy").expect("Failed to find kdestroy executable");
Command::new(kdestroy_exec).spawn().unwrap().wait().unwrap();
if !PathBuf::from("target/test/hdfs.keytab").exists() {
panic!("Failed to find keytab");
}
if !PathBuf::from(&krb_conf).exists() {
panic!("Failed to find krb5.conf");
}
env::set_var("KRB5_CONFIG", &krb_conf);
env::set_var(
"HADOOP_OPTS",
format!("-Djava.security.krb5.conf={}", &krb_conf),
);
// If we testing token auth, set the path to the file and make sure we don't have an old kinit, otherwise kinit
if features.contains(&DfsFeatures::TOKEN) {
env::set_var("HADOOP_TOKEN_FILE_LOCATION", "target/test/delegation_token");
} else {
let kinit_exec = which("kinit").expect("Failed to find kinit executable");
env::set_var("KRB5CCNAME", "FILE:target/test/krbcache");
Command::new(kinit_exec)
.args(["-kt", "target/test/hdfs.keytab", "hdfs/localhost"])
.spawn()
.unwrap()
.wait()
.unwrap();
}
}
let url = if features.contains(&DfsFeatures::VIEWFS) {
"viewfs://minidfs-viewfs"
} else if features.contains(&DfsFeatures::RBF) {
"hdfs://fed"
} else if features.contains(&DfsFeatures::HA) {
"hdfs://minidfs-ns"
} else {
"hdfs://127.0.0.1:9000"
};
env::set_var("HADOOP_CONF_DIR", "target/test");
MiniDfs {
process: child,
url: url.to_string(),
}
}
}
impl Drop for MiniDfs {
fn drop(&mut self) {
println!("Dropping and killing minidfs");
let mut stdin = self.process.stdin.take().unwrap();
stdin.write_all(b"\n").unwrap();
self.process.kill().unwrap();
self.process.wait().unwrap();
}
}