Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix path handling in windows #202

Merged
merged 35 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3edf23c
Create testfile with client rather than CLI and drop block size and t…
Kimahriman Jan 22, 2025
0710aa6
Keep it for EC tests actually
Kimahriman Jan 22, 2025
c57ca93
Add simple windows test
Kimahriman Jan 22, 2025
9b52cdb
Output minidfs sterr
Kimahriman Jan 23, 2025
0baadd5
Try non-ha
Kimahriman Jan 24, 2025
051150e
Try more stuff
Kimahriman Jan 24, 2025
336d7b5
Fix
Kimahriman Jan 24, 2025
fd2a352
Try again
Kimahriman Jan 24, 2025
bcdea98
Again
Kimahriman Jan 24, 2025
cef2a36
Again
Kimahriman Jan 24, 2025
da47d2f
Try setting env var
Kimahriman Jan 24, 2025
e026a77
typo
Kimahriman Jan 24, 2025
9e44464
Fix env var
Kimahriman Jan 24, 2025
6ea711d
again
Kimahriman Jan 24, 2025
7c1f7d8
So close
Kimahriman Jan 24, 2025
d77dac8
Look at bin too
Kimahriman Jan 24, 2025
69a9768
Fake winutils
Kimahriman Jan 25, 2025
1bb27bb
Try real winutils
Kimahriman Jan 25, 2025
a71b0fa
Fix var
Kimahriman Jan 25, 2025
98db2b1
Update rust-test.yml
Kimahriman Jan 25, 2025
e022eff
More logging
Kimahriman Jan 25, 2025
34ddaeb
Update rust-test.yml
Kimahriman Jan 25, 2025
7c41f1b
Add some prints
Kimahriman Jan 25, 2025
04e756f
Update Main.java
Kimahriman Jan 25, 2025
0f71493
More logging
Kimahriman Jan 26, 2025
b40a5d6
One more log
Kimahriman Jan 26, 2025
d665a21
ubuntu also
Kimahriman Jan 26, 2025
398714f
Try match hadoop version
Kimahriman Jan 26, 2025
b0ee276
Different
Kimahriman Jan 26, 2025
1c7cdfe
Tryto print failing key
Kimahriman Jan 26, 2025
207ccad
Run non integration tests for all OS
Kimahriman Jan 26, 2025
7f5609f
Stop using Path
Kimahriman Jan 27, 2025
0d22fe7
Cleanup and shrink ec test size
Kimahriman Jan 27, 2025
50d6d39
Try to fix tests
Kimahriman Jan 27, 2025
8c7876a
Final cleanup
Kimahriman Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions .github/workflows/rust-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ jobs:
- name: Check all features
run: cargo check --all-targets --features integration-test,benchmark

test:
- name: Run unit tests
run: cargo test -p hdfs-native --lib

test-integration:
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
# - macos-latest
# - windows-latest
runs-on: ${{ matrix.os }}
env:
# Disable full debug symbol generation to speed up CI build and keep memory down
Expand Down Expand Up @@ -88,9 +89,9 @@ jobs:

- name: Download Hadoop
run: |
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1-lean.tar.gz
tar -xf hadoop-3.4.1-lean.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE/hadoop-3.4.1/bin" >> $GITHUB_PATH

- name: Run tests
run: cargo test --features integration-test
1 change: 1 addition & 0 deletions rust/examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[cfg(feature = "integration-test")]
use std::collections::HashSet;

#[cfg(feature = "integration-test")]
Expand Down
10 changes: 5 additions & 5 deletions rust/minidfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand All @@ -22,23 +22,23 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-rbf</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-federation-balance</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
5 changes: 3 additions & 2 deletions rust/minidfs/src/main/java/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static void main(String args[]) throws Exception {
new File("target/test/delegation_token").delete();

Configuration conf = new Configuration();
conf.set("dfs.blocksize", "16777216"); // 16 MiB instead of 128 MiB
if (flags.contains("security")) {
kdc = new MiniKdc(MiniKdc.createConf(), new File("target/test/kdc"));
kdc.setTransport("UDP");
Expand Down Expand Up @@ -181,7 +182,7 @@ public static void main(String args[]) throws Exception {

BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
reader.readLine();

if (dfs != null) {
dfs.close();
}
Expand Down Expand Up @@ -215,4 +216,4 @@ public static MiniDFSNNTopology generateTopology(Set<String> flags, Configuratio
}
return nnTopology;
}
}
}
108 changes: 38 additions & 70 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use futures::stream::BoxStream;
Expand Down Expand Up @@ -86,34 +85,29 @@ impl WriteOptions {

#[derive(Debug, Clone)]
struct MountLink {
viewfs_path: PathBuf,
hdfs_path: PathBuf,
viewfs_path: String,
hdfs_path: String,
protocol: Arc<NamenodeProtocol>,
}

impl MountLink {
fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
// We should never have an empty path, we always want things mounted at root ("/") by default.
Self {
viewfs_path: PathBuf::from(if viewfs_path.is_empty() {
"/"
} else {
viewfs_path
}),
hdfs_path: PathBuf::from(if hdfs_path.is_empty() { "/" } else { hdfs_path }),
viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
protocol,
}
}
/// Convert a viewfs path into a name service path if it matches this link
fn resolve(&self, path: &Path) -> Option<PathBuf> {
if let Ok(relative_path) = path.strip_prefix(&self.viewfs_path) {
if relative_path.components().count() == 0 {
Some(self.hdfs_path.clone())
} else {
Some(self.hdfs_path.join(relative_path))
}
fn resolve(&self, path: &str) -> Option<String> {
// Make sure we don't partially match the last component. It either needs to be an exact
// match to a viewfs path, or needs to match with a trailing slash
if path == self.viewfs_path {
Some(self.hdfs_path.clone())
} else {
None
path.strip_prefix(&format!("{}/", self.viewfs_path))
.map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
}
}
}
Expand All @@ -126,20 +120,12 @@ struct MountTable {

impl MountTable {
fn resolve(&self, src: &str) -> (&MountLink, String) {
let path = Path::new(src);
for link in self.mounts.iter() {
if let Some(resolved) = link.resolve(path) {
return (link, resolved.to_string_lossy().into());
if let Some(resolved) = link.resolve(src) {
return (link, resolved);
}
}
(
&self.fallback,
self.fallback
.resolve(path)
.unwrap()
.to_string_lossy()
.into(),
)
(&self.fallback, self.fallback.resolve(src).unwrap())
}
}

Expand Down Expand Up @@ -246,7 +232,7 @@ impl Client {

if let Some(fallback) = fallback {
// Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
mounts.sort_by_key(|m| m.viewfs_path.components().count());
mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
mounts.reverse();

Ok(MountTable { mounts, fallback })
Expand Down Expand Up @@ -719,19 +705,16 @@ pub struct FileStatus {

impl FileStatus {
fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
let mut path = PathBuf::from(base_path);
if let Ok(relative_path) = std::str::from_utf8(&value.path) {
if !relative_path.is_empty() {
path.push(relative_path)
}
let mut path = base_path.trim_end_matches("/").to_string();
let relative_path = std::str::from_utf8(&value.path).unwrap();
if !relative_path.is_empty() {
path.push('/');
path.push_str(relative_path);
}

FileStatus {
isdir: value.file_type() == FileType::IsDir,
path: path
.to_str()
.map(|x| x.to_string())
.unwrap_or(String::new()),
path,
length: value.length as usize,
permission: value.permission.perm as u16,
owner: value.owner,
Expand Down Expand Up @@ -769,10 +752,7 @@ impl From<ContentSummaryProto> for ContentSummary {

#[cfg(test)]
mod test {
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use std::sync::Arc;

use url::Url;

Expand Down Expand Up @@ -837,34 +817,22 @@ mod test {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("/view", "/hdfs", protocol);

assert_eq!(
link.resolve(Path::new("/view/dir/file")).unwrap(),
PathBuf::from("/hdfs/dir/file")
);
assert_eq!(
link.resolve(Path::new("/view")).unwrap(),
PathBuf::from("/hdfs")
);
assert!(link.resolve(Path::new("/hdfs/path")).is_none());
assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
assert!(link.resolve("/hdfs/path").is_none());
}

#[test]
fn test_fallback_link() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("", "/hdfs", protocol);
let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));

assert_eq!(
link.resolve(Path::new("/path/to/file")).unwrap(),
PathBuf::from("/hdfs/path/to/file")
);
assert_eq!(
link.resolve(Path::new("/")).unwrap(),
PathBuf::from("/hdfs")
);
assert_eq!(
link.resolve(Path::new("/hdfs/path")).unwrap(),
PathBuf::from("/hdfs/hdfs/path")
);
assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");

let link = MountLink::new("", "", protocol);
assert_eq!(link.resolve("/").unwrap(), "/");
}

#[test]
Expand Down Expand Up @@ -893,25 +861,25 @@ mod test {

// Exact mount path resolves to the exact HDFS path
let (link, resolved) = mount_table.resolve("/mount1");
assert_eq!(link.viewfs_path, Path::new("/mount1"));
assert_eq!(link.viewfs_path, "/mount1");
assert_eq!(resolved, "/path1/nested");

// Trailing slash is treated the same
let (link, resolved) = mount_table.resolve("/mount1/");
assert_eq!(link.viewfs_path, Path::new("/mount1"));
assert_eq!(resolved, "/path1/nested");
assert_eq!(link.viewfs_path, "/mount1");
assert_eq!(resolved, "/path1/nested/");

// Doesn't do partial matches on a directory name
let (link, resolved) = mount_table.resolve("/mount12");
assert_eq!(link.viewfs_path, Path::new("/"));
assert_eq!(link.viewfs_path, "");
assert_eq!(resolved, "/path4/mount12");

let (link, resolved) = mount_table.resolve("/mount3/file");
assert_eq!(link.viewfs_path, Path::new("/"));
assert_eq!(link.viewfs_path, "");
assert_eq!(resolved, "/path4/mount3/file");

let (link, resolved) = mount_table.resolve("/mount3/nested/file");
assert_eq!(link.viewfs_path, Path::new("/mount3/nested"));
assert_eq!(link.viewfs_path, "/mount3/nested");
assert_eq!(resolved, "/path3/file");
}
}
7 changes: 6 additions & 1 deletion rust/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ impl Configuration {

#[cfg(test)]
mod test {
use std::net::IpAddr;

use dns_lookup::lookup_addr;

use crate::common::config::DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN;

use super::{
Expand Down Expand Up @@ -282,8 +286,9 @@ mod test {
};

let urls = config.get_urls_for_nameservice("test").unwrap();
let fqdn = lookup_addr(&IpAddr::from([127, 0, 0, 1])).unwrap();
assert_eq!(urls.len(), 1, "{:?}", urls);
assert_eq!(urls[0], "localhost:9000");
assert_eq!(urls[0], format!("{}:9000", fqdn));

config.map.insert(
format!("{}.{}", DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN, "test"),
Expand Down
19 changes: 5 additions & 14 deletions rust/src/security/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use prost::Message;
use std::env;
use std::fs;
use std::io;
use std::path::Path;
use std::path::PathBuf;

use whoami::username;
Expand Down Expand Up @@ -180,12 +181,12 @@ pub struct Token {
impl Token {
fn load_tokens() -> Vec<Self> {
match env::var(HADOOP_TOKEN_FILE_LOCATION).map(PathBuf::from) {
Ok(path) if path.exists() => Self::read_token_file(path).ok().unwrap_or_default(),
Ok(path) if path.exists() => Self::read_token_file(&path).ok().unwrap_or_default(),
_ => Vec::new(),
}
}

fn read_token_file(path: PathBuf) -> std::io::Result<Vec<Self>> {
fn read_token_file(path: &Path) -> std::io::Result<Vec<Self>> {
let mut content = Bytes::from(fs::read(path)?);

let magic = content.copy_to_bytes(4);
Expand Down Expand Up @@ -425,12 +426,7 @@ mod tests {
.unwrap();
token_file.flush().unwrap();

env::set_var(
HADOOP_TOKEN_FILE_LOCATION,
token_file.path().to_str().unwrap(),
);

let tokens = Token::load_tokens();
let tokens = Token::read_token_file(token_file.path()).unwrap();

assert_eq!(tokens.len(), 1);
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
Expand All @@ -455,12 +451,7 @@ mod tests {
.unwrap();
token_file.flush().unwrap();

env::set_var(
HADOOP_TOKEN_FILE_LOCATION,
token_file.path().to_str().unwrap(),
);

let tokens = Token::load_tokens();
let tokens = Token::read_token_file(token_file.path()).unwrap();

assert_eq!(tokens.len(), 1);
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
Expand Down
Loading
Loading