Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove need for hadoop binary #82

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ cargo build --features token,kerberos
An object_store implementation for HDFS is provided in the [hdfs-native-object-store](./crates/hdfs-native-object-store/) crate.

## Running tests
The tests are mostly integration tests that utilize a small Java application in `rust/mindifs/` that runs a custom `MiniDFSCluster`. To run the tests, you need to have Java, Maven, Hadoop binaries, and Kerberos tools available and on your path. Any Java version between 8 and 17 should work.
The tests are mostly integration tests that utilize a small Java application in `rust/mindifs/` that runs a custom `MiniDFSCluster`. To run the tests, you need to have Java, Maven, and Kerberos tools available and on your path. Any Java version between 8 and 17 should work.

```bash
cargo test -p hdfs-native --features token,kerberos,intergation-test
13 changes: 13 additions & 0 deletions crates/hdfs-native/minidfs/src/main/java/main/Main.java
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -34,6 +36,8 @@

public class Main {

static int TEST_FILE_INTS = 64 * 1024 * 1024;

public static void main(String args[]) throws Exception {
Set<String> flags = new HashSet<>();
for (String arg : args) {
@@ -162,6 +166,15 @@ public static void main(String args[]) throws Exception {

hdfsConf.writeXml(new FileOutputStream("target/test/core-site.xml"));

if (flags.contains("testfile")) {
FileSystem fs = FileSystem.get(hdfsConf);
FSDataOutputStream os = fs.create(new Path("/testfile"));
for (int i=0; i < TEST_FILE_INTS; i++) {
os.writeInt(i);
}
os.close();
}

System.out.println("Ready!");
if (flags.contains("security")) {
System.out.println(kdc.getKrb5conf().toPath().toString());
2 changes: 2 additions & 0 deletions crates/hdfs-native/src/minidfs.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ use which::which;

#[derive(PartialEq, Eq, Hash, Debug)]
pub enum DfsFeatures {
TESTFILE,
SECURITY,
TOKEN,
PRIVACY,
@@ -21,6 +22,7 @@ pub enum DfsFeatures {
impl DfsFeatures {
pub fn as_str(&self) -> &str {
match self {
DfsFeatures::TESTFILE => "testfile",
DfsFeatures::EC => "ec",
DfsFeatures::HA => "ha",
DfsFeatures::VIEWFS => "viewfs",
37 changes: 0 additions & 37 deletions crates/hdfs-native/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,8 @@
#![allow(dead_code)]
use bytes::Buf;
use std::collections::HashSet;
use std::io::{BufWriter, Write};
use std::process::Command;
use tempfile::NamedTempFile;
use which::which;

use hdfs_native::minidfs::{DfsFeatures, MiniDfs};

pub const TEST_FILE_INTS: usize = 64 * 1024 * 1024;

pub fn setup(features: &HashSet<DfsFeatures>) -> MiniDfs {
let hadoop_exc = which("hadoop").expect("Failed to find hadoop executable");

let dfs = MiniDfs::with_features(features);

let mut file = NamedTempFile::new_in("target/test").unwrap();
{
let mut writer = BufWriter::new(file.as_file_mut());
for i in 0..TEST_FILE_INTS as i32 {
let bytes = i.to_be_bytes();
writer.write_all(&bytes).unwrap();
}
writer.flush().unwrap();
}

let status = Command::new(hadoop_exc)
.args([
"fs",
"-copyFromLocal",
"-f",
file.path().to_str().unwrap(),
&format!("{}/testfile", dfs.url),
])
.status()
.unwrap();
assert!(status.success());

dfs
}

pub fn assert_bufs_equal(buf1: &impl Buf, buf2: &impl Buf, message: Option<String>) {
assert_eq!(buf1.chunk().len(), buf2.chunk().len());

39 changes: 21 additions & 18 deletions crates/hdfs-native/tests/test_integration.rs
Original file line number Diff line number Diff line change
@@ -3,23 +3,27 @@ mod common;

#[cfg(feature = "integration-test")]
mod test {
use crate::common::{assert_bufs_equal, setup, TEST_FILE_INTS};
use crate::common::{assert_bufs_equal, TEST_FILE_INTS};
use bytes::{BufMut, BytesMut};
use hdfs_native::{client::FileStatus, minidfs::DfsFeatures, Client, Result, WriteOptions};
use hdfs_native::{
client::FileStatus,
minidfs::{DfsFeatures, MiniDfs},
Client, Result, WriteOptions,
};
use serial_test::serial;
use std::collections::HashSet;

#[tokio::test]
#[serial]
async fn test_basic() {
test_with_features(&HashSet::new()).await.unwrap();
test_with_features(HashSet::new()).await.unwrap();
}

#[tokio::test]
#[serial]
#[cfg(feature = "kerberos")]
async fn test_security_kerberos() {
test_with_features(&HashSet::from([DfsFeatures::SECURITY]))
test_with_features(HashSet::from([DfsFeatures::SECURITY]))
.await
.unwrap();
}
@@ -28,7 +32,7 @@ mod test {
#[serial]
#[cfg(feature = "token")]
async fn test_security_token() {
test_with_features(&HashSet::from([DfsFeatures::SECURITY, DfsFeatures::TOKEN]))
test_with_features(HashSet::from([DfsFeatures::SECURITY, DfsFeatures::TOKEN]))
.await
.unwrap();
}
@@ -38,7 +42,7 @@ mod test {
#[serial]
#[cfg(feature = "token")]
async fn test_privacy_token() {
test_with_features(&HashSet::from([
test_with_features(HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::TOKEN,
DfsFeatures::PRIVACY,
@@ -51,18 +55,15 @@ mod test {
#[serial]
#[cfg(feature = "kerberos")]
async fn test_privacy_kerberos() {
test_with_features(&HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::PRIVACY,
]))
.await
.unwrap();
test_with_features(HashSet::from([DfsFeatures::SECURITY, DfsFeatures::PRIVACY]))
.await
.unwrap();
}

#[tokio::test]
#[serial]
async fn test_basic_ha() {
test_with_features(&HashSet::from([DfsFeatures::HA]))
test_with_features(HashSet::from([DfsFeatures::HA]))
.await
.unwrap();
}
@@ -71,7 +72,7 @@ mod test {
#[serial]
#[cfg(feature = "kerberos")]
async fn test_security_privacy_ha() {
test_with_features(&HashSet::from([
test_with_features(HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::PRIVACY,
DfsFeatures::HA,
@@ -84,7 +85,7 @@ mod test {
#[serial]
#[cfg(feature = "token")]
async fn test_security_token_ha() {
test_with_features(&HashSet::from([
test_with_features(HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::TOKEN,
DfsFeatures::HA,
@@ -96,15 +97,17 @@ mod test {
#[tokio::test]
#[serial]
async fn test_rbf() {
test_with_features(&HashSet::from([DfsFeatures::RBF]))
test_with_features(HashSet::from([DfsFeatures::RBF]))
.await
.unwrap();
}

pub async fn test_with_features(features: &HashSet<DfsFeatures>) -> Result<()> {
pub async fn test_with_features(mut features: HashSet<DfsFeatures>) -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = setup(features);
features.insert(DfsFeatures::TESTFILE);

let _dfs = MiniDfs::with_features(&features);
let client = Client::default();

test_file_info(&client).await?;
9 changes: 6 additions & 3 deletions crates/hdfs-native/tests/test_read.rs
Original file line number Diff line number Diff line change
@@ -3,9 +3,12 @@ mod common;

#[cfg(feature = "integration-test")]
mod test {
use crate::common::{setup, TEST_FILE_INTS};
use crate::common::TEST_FILE_INTS;
use bytes::Buf;
use hdfs_native::{minidfs::DfsFeatures, Client, Result};
use hdfs_native::{
minidfs::{DfsFeatures, MiniDfs},
Client, Result,
};
use serial_test::serial;
use std::collections::HashSet;

@@ -14,7 +17,7 @@ mod test {
async fn test_read() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = setup(&HashSet::from([DfsFeatures::HA]));
let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA, DfsFeatures::TESTFILE]));
let client = Client::default();

// Read the whole file
9 changes: 6 additions & 3 deletions crates/hdfs-native/tests/test_write.rs
Original file line number Diff line number Diff line change
@@ -3,9 +3,12 @@ mod common;

#[cfg(feature = "integration-test")]
mod test {
use crate::common::{assert_bufs_equal, setup};
use crate::common::assert_bufs_equal;
use bytes::{BufMut, BytesMut};
use hdfs_native::{minidfs::DfsFeatures, Client, Result, WriteOptions};
use hdfs_native::{
minidfs::{DfsFeatures, MiniDfs},
Client, Result, WriteOptions,
};
use serial_test::serial;
use std::collections::HashSet;

@@ -14,7 +17,7 @@ mod test {
async fn test_write() {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = setup(&HashSet::from([DfsFeatures::HA]));
let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA]));
let client = Client::default();

test_create(&client).await.unwrap();
Loading