Skip to content

Commit 441d56c

Browse files
authored
Fix path handling in windows (#202)
1 parent e0b896c commit 441d56c

File tree

12 files changed

+97
-149
lines changed

12 files changed

+97
-149
lines changed

.github/workflows/rust-test.yml

+7-6
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@ jobs:
5252
- name: Check all features
5353
run: cargo check --all-targets --features integration-test,benchmark
5454

55-
test:
55+
- name: Run unit tests
56+
run: cargo test -p hdfs-native --lib
57+
58+
test-integration:
5659
strategy:
5760
fail-fast: false
5861
matrix:
5962
os:
6063
- ubuntu-latest
61-
# - macos-latest
62-
# - windows-latest
6364
runs-on: ${{ matrix.os }}
6465
env:
6566
# Disable full debug symbol generation to speed up CI build and keep memory down
@@ -88,9 +89,9 @@ jobs:
8889

8990
- name: Download Hadoop
9091
run: |
91-
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
92-
tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE
93-
echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH
92+
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1-lean.tar.gz
93+
tar -xf hadoop-3.4.1-lean.tar.gz -C $GITHUB_WORKSPACE
94+
echo "$GITHUB_WORKSPACE/hadoop-3.4.1/bin" >> $GITHUB_PATH
9495
9596
- name: Run tests
9697
run: cargo test --features integration-test

rust/examples/simple.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#[cfg(feature = "integration-test")]
12
use std::collections::HashSet;
23

34
#[cfg(feature = "integration-test")]

rust/minidfs/pom.xml

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<dependency>
1212
<groupId>org.apache.hadoop</groupId>
1313
<artifactId>hadoop-minicluster</artifactId>
14-
<version>3.4.0</version>
14+
<version>3.4.1</version>
1515
<exclusions>
1616
<exclusion>
1717
<groupId>ch.qos.logback</groupId>
@@ -22,23 +22,23 @@
2222
<dependency>
2323
<groupId>org.apache.hadoop</groupId>
2424
<artifactId>hadoop-minikdc</artifactId>
25-
<version>3.4.0</version>
25+
<version>3.4.1</version>
2626
</dependency>
2727
<dependency>
2828
<groupId>org.apache.hadoop</groupId>
2929
<artifactId>hadoop-hdfs-rbf</artifactId>
30-
<version>3.4.0</version>
30+
<version>3.4.1</version>
3131
</dependency>
3232
<dependency>
3333
<groupId>org.apache.hadoop</groupId>
3434
<artifactId>hadoop-hdfs-rbf</artifactId>
35-
<version>3.4.0</version>
35+
<version>3.4.1</version>
3636
<type>test-jar</type>
3737
</dependency>
3838
<dependency>
3939
<groupId>org.apache.hadoop</groupId>
4040
<artifactId>hadoop-federation-balance</artifactId>
41-
<version>3.4.0</version>
41+
<version>3.4.1</version>
4242
</dependency>
4343
<dependency>
4444
<groupId>junit</groupId>

rust/minidfs/src/main/java/main/Main.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static void main(String args[]) throws Exception {
4545
new File("target/test/delegation_token").delete();
4646

4747
Configuration conf = new Configuration();
48+
conf.set("dfs.blocksize", "16777216"); // 16 MiB instead of 128 MiB
4849
if (flags.contains("security")) {
4950
kdc = new MiniKdc(MiniKdc.createConf(), new File("target/test/kdc"));
5051
kdc.setTransport("UDP");
@@ -181,7 +182,7 @@ public static void main(String args[]) throws Exception {
181182

182183
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
183184
reader.readLine();
184-
185+
185186
if (dfs != null) {
186187
dfs.close();
187188
}
@@ -215,4 +216,4 @@ public static MiniDFSNNTopology generateTopology(Set<String> flags, Configuratio
215216
}
216217
return nnTopology;
217218
}
218-
}
219+
}

rust/src/client.rs

+38-70
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::collections::{HashMap, VecDeque};
2-
use std::path::{Path, PathBuf};
32
use std::sync::Arc;
43

54
use futures::stream::BoxStream;
@@ -86,34 +85,29 @@ impl WriteOptions {
8685

8786
#[derive(Debug, Clone)]
8887
struct MountLink {
89-
viewfs_path: PathBuf,
90-
hdfs_path: PathBuf,
88+
viewfs_path: String,
89+
hdfs_path: String,
9190
protocol: Arc<NamenodeProtocol>,
9291
}
9392

9493
impl MountLink {
9594
fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
9695
// We should never have an empty path, we always want things mounted at root ("/") by default.
9796
Self {
98-
viewfs_path: PathBuf::from(if viewfs_path.is_empty() {
99-
"/"
100-
} else {
101-
viewfs_path
102-
}),
103-
hdfs_path: PathBuf::from(if hdfs_path.is_empty() { "/" } else { hdfs_path }),
97+
viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
98+
hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
10499
protocol,
105100
}
106101
}
107102
/// Convert a viewfs path into a name service path if it matches this link
108-
fn resolve(&self, path: &Path) -> Option<PathBuf> {
109-
if let Ok(relative_path) = path.strip_prefix(&self.viewfs_path) {
110-
if relative_path.components().count() == 0 {
111-
Some(self.hdfs_path.clone())
112-
} else {
113-
Some(self.hdfs_path.join(relative_path))
114-
}
103+
fn resolve(&self, path: &str) -> Option<String> {
104+
// Make sure we don't partially match the last component. It either needs to be an exact
105+
// match to a viewfs path, or needs to match with a trailing slash
106+
if path == self.viewfs_path {
107+
Some(self.hdfs_path.clone())
115108
} else {
116-
None
109+
path.strip_prefix(&format!("{}/", self.viewfs_path))
110+
.map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
117111
}
118112
}
119113
}
@@ -126,20 +120,12 @@ struct MountTable {
126120

127121
impl MountTable {
128122
fn resolve(&self, src: &str) -> (&MountLink, String) {
129-
let path = Path::new(src);
130123
for link in self.mounts.iter() {
131-
if let Some(resolved) = link.resolve(path) {
132-
return (link, resolved.to_string_lossy().into());
124+
if let Some(resolved) = link.resolve(src) {
125+
return (link, resolved);
133126
}
134127
}
135-
(
136-
&self.fallback,
137-
self.fallback
138-
.resolve(path)
139-
.unwrap()
140-
.to_string_lossy()
141-
.into(),
142-
)
128+
(&self.fallback, self.fallback.resolve(src).unwrap())
143129
}
144130
}
145131

@@ -246,7 +232,7 @@ impl Client {
246232

247233
if let Some(fallback) = fallback {
248234
// Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
249-
mounts.sort_by_key(|m| m.viewfs_path.components().count());
235+
mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
250236
mounts.reverse();
251237

252238
Ok(MountTable { mounts, fallback })
@@ -719,19 +705,16 @@ pub struct FileStatus {
719705

720706
impl FileStatus {
721707
fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
722-
let mut path = PathBuf::from(base_path);
723-
if let Ok(relative_path) = std::str::from_utf8(&value.path) {
724-
if !relative_path.is_empty() {
725-
path.push(relative_path)
726-
}
708+
let mut path = base_path.trim_end_matches("/").to_string();
709+
let relative_path = std::str::from_utf8(&value.path).unwrap();
710+
if !relative_path.is_empty() {
711+
path.push('/');
712+
path.push_str(relative_path);
727713
}
728714

729715
FileStatus {
730716
isdir: value.file_type() == FileType::IsDir,
731-
path: path
732-
.to_str()
733-
.map(|x| x.to_string())
734-
.unwrap_or(String::new()),
717+
path,
735718
length: value.length as usize,
736719
permission: value.permission.perm as u16,
737720
owner: value.owner,
@@ -769,10 +752,7 @@ impl From<ContentSummaryProto> for ContentSummary {
769752

770753
#[cfg(test)]
771754
mod test {
772-
use std::{
773-
path::{Path, PathBuf},
774-
sync::Arc,
775-
};
755+
use std::sync::Arc;
776756

777757
use url::Url;
778758

@@ -837,34 +817,22 @@ mod test {
837817
let protocol = create_protocol("hdfs://127.0.0.1:9000");
838818
let link = MountLink::new("/view", "/hdfs", protocol);
839819

840-
assert_eq!(
841-
link.resolve(Path::new("/view/dir/file")).unwrap(),
842-
PathBuf::from("/hdfs/dir/file")
843-
);
844-
assert_eq!(
845-
link.resolve(Path::new("/view")).unwrap(),
846-
PathBuf::from("/hdfs")
847-
);
848-
assert!(link.resolve(Path::new("/hdfs/path")).is_none());
820+
assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
821+
assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
822+
assert!(link.resolve("/hdfs/path").is_none());
849823
}
850824

851825
#[test]
852826
fn test_fallback_link() {
853827
let protocol = create_protocol("hdfs://127.0.0.1:9000");
854-
let link = MountLink::new("", "/hdfs", protocol);
828+
let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
855829

856-
assert_eq!(
857-
link.resolve(Path::new("/path/to/file")).unwrap(),
858-
PathBuf::from("/hdfs/path/to/file")
859-
);
860-
assert_eq!(
861-
link.resolve(Path::new("/")).unwrap(),
862-
PathBuf::from("/hdfs")
863-
);
864-
assert_eq!(
865-
link.resolve(Path::new("/hdfs/path")).unwrap(),
866-
PathBuf::from("/hdfs/hdfs/path")
867-
);
830+
assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
831+
assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
832+
assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
833+
834+
let link = MountLink::new("", "", protocol);
835+
assert_eq!(link.resolve("/").unwrap(), "/");
868836
}
869837

870838
#[test]
@@ -893,25 +861,25 @@ mod test {
893861

894862
// Exact mount path resolves to the exact HDFS path
895863
let (link, resolved) = mount_table.resolve("/mount1");
896-
assert_eq!(link.viewfs_path, Path::new("/mount1"));
864+
assert_eq!(link.viewfs_path, "/mount1");
897865
assert_eq!(resolved, "/path1/nested");
898866

899867
// Trailing slash is treated the same
900868
let (link, resolved) = mount_table.resolve("/mount1/");
901-
assert_eq!(link.viewfs_path, Path::new("/mount1"));
902-
assert_eq!(resolved, "/path1/nested");
869+
assert_eq!(link.viewfs_path, "/mount1");
870+
assert_eq!(resolved, "/path1/nested/");
903871

904872
// Doesn't do partial matches on a directory name
905873
let (link, resolved) = mount_table.resolve("/mount12");
906-
assert_eq!(link.viewfs_path, Path::new("/"));
874+
assert_eq!(link.viewfs_path, "");
907875
assert_eq!(resolved, "/path4/mount12");
908876

909877
let (link, resolved) = mount_table.resolve("/mount3/file");
910-
assert_eq!(link.viewfs_path, Path::new("/"));
878+
assert_eq!(link.viewfs_path, "");
911879
assert_eq!(resolved, "/path4/mount3/file");
912880

913881
let (link, resolved) = mount_table.resolve("/mount3/nested/file");
914-
assert_eq!(link.viewfs_path, Path::new("/mount3/nested"));
882+
assert_eq!(link.viewfs_path, "/mount3/nested");
915883
assert_eq!(resolved, "/path3/file");
916884
}
917885
}

rust/src/common/config.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ impl Configuration {
188188

189189
#[cfg(test)]
190190
mod test {
191+
use std::net::IpAddr;
192+
193+
use dns_lookup::lookup_addr;
194+
191195
use crate::common::config::DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN;
192196

193197
use super::{
@@ -282,8 +286,9 @@ mod test {
282286
};
283287

284288
let urls = config.get_urls_for_nameservice("test").unwrap();
289+
let fqdn = lookup_addr(&IpAddr::from([127, 0, 0, 1])).unwrap();
285290
assert_eq!(urls.len(), 1, "{:?}", urls);
286-
assert_eq!(urls[0], "localhost:9000");
291+
assert_eq!(urls[0], format!("{}:9000", fqdn));
287292

288293
config.map.insert(
289294
format!("{}.{}", DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN, "test"),

rust/src/security/user.rs

+5-14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use prost::Message;
55
use std::env;
66
use std::fs;
77
use std::io;
8+
use std::path::Path;
89
use std::path::PathBuf;
910

1011
use whoami::username;
@@ -180,12 +181,12 @@ pub struct Token {
180181
impl Token {
181182
fn load_tokens() -> Vec<Self> {
182183
match env::var(HADOOP_TOKEN_FILE_LOCATION).map(PathBuf::from) {
183-
Ok(path) if path.exists() => Self::read_token_file(path).ok().unwrap_or_default(),
184+
Ok(path) if path.exists() => Self::read_token_file(&path).ok().unwrap_or_default(),
184185
_ => Vec::new(),
185186
}
186187
}
187188

188-
fn read_token_file(path: PathBuf) -> std::io::Result<Vec<Self>> {
189+
fn read_token_file(path: &Path) -> std::io::Result<Vec<Self>> {
189190
let mut content = Bytes::from(fs::read(path)?);
190191

191192
let magic = content.copy_to_bytes(4);
@@ -425,12 +426,7 @@ mod tests {
425426
.unwrap();
426427
token_file.flush().unwrap();
427428

428-
env::set_var(
429-
HADOOP_TOKEN_FILE_LOCATION,
430-
token_file.path().to_str().unwrap(),
431-
);
432-
433-
let tokens = Token::load_tokens();
429+
let tokens = Token::read_token_file(token_file.path()).unwrap();
434430

435431
assert_eq!(tokens.len(), 1);
436432
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
@@ -455,12 +451,7 @@ mod tests {
455451
.unwrap();
456452
token_file.flush().unwrap();
457453

458-
env::set_var(
459-
HADOOP_TOKEN_FILE_LOCATION,
460-
token_file.path().to_str().unwrap(),
461-
);
462-
463-
let tokens = Token::load_tokens();
454+
let tokens = Token::read_token_file(token_file.path()).unwrap();
464455

465456
assert_eq!(tokens.len(), 1);
466457
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");

0 commit comments

Comments
 (0)