diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml index 50038ed..faca365 100644 --- a/.github/workflows/python-test.yml +++ b/.github/workflows/python-test.yml @@ -13,21 +13,6 @@ defaults: working-directory: ./python jobs: - lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Setup Python - uses: actions/setup-python@v4 - with: - python-version: 3.12 - - name: Install dev dependencies - run: | - pip3 install -r requirements-dev.txt - - name: Type checking - run: | - mypy python - test: strategy: fail-fast: false @@ -68,12 +53,18 @@ jobs: sccache: 'true' container: 'off' working-directory: ./python - args: --features kerberos + args: --features kerberos --extras devel - - name: Install dev dependencies and run tests + - name: Run lints + run: | + source .venv/bin/activate + mypy + isort . --check + black . --check + + - name: Run tests run: | source .venv/bin/activate - pip3 install -r requirements-dev.txt pytest build: diff --git a/python/README.md b/python/README.md index a4718d9..1e3bf3b 100644 --- a/python/README.md +++ b/python/README.md @@ -22,7 +22,6 @@ The same requirements apply as the Rust tests, requiring Java, Maven, Hadoop, an python3 -m venv .venv source .venv/bin/activate pip3 install maturin -pip3 install -r requirements-dev.txt -maturin develop +maturin develop -E devel pytest ``` \ No newline at end of file diff --git a/python/tests/conftest.py b/python/conftest.py similarity index 65% rename from python/tests/conftest.py rename to python/conftest.py index df7aa4f..03d4783 100644 --- a/python/tests/conftest.py +++ b/python/conftest.py @@ -1,8 +1,14 @@ - import subprocess +import urllib +import urllib.parse + +import fsspec import pytest -@pytest.fixture +from hdfs_native.fsspec import HdfsFileSystem + + +@pytest.fixture(scope="module") def minidfs(): child = subprocess.Popen( [ @@ -19,7 +25,7 @@ def minidfs(): stderr=subprocess.DEVNULL, universal_newlines=True, encoding="utf8", - bufsize=0 + bufsize=0, ) output = child.stdout.readline().strip() @@ -31,3 +37,9 @@ def minidfs(): child.communicate(input="\n", timeout=30) except: child.kill() + + +@pytest.fixture(scope="module") +def fs(minidfs: str) -> HdfsFileSystem: + url = urllib.parse.urlparse(minidfs) + return fsspec.filesystem(url.scheme, host=url.hostname, port=url.port) diff --git a/python/python/hdfs_native/__init__.py b/python/hdfs_native/__init__.py similarity index 78% rename from python/python/hdfs_native/__init__.py rename to python/hdfs_native/__init__.py index a49887f..4d7daef 100644 --- a/python/python/hdfs_native/__init__.py +++ b/python/hdfs_native/__init__.py @@ -1,42 +1,68 @@ import io -from typing import Iterator, Optional -from typing_extensions import Buffer +import os +from typing import Dict, Iterator, Optional + +# For some reason mypy doesn't think this exists +from typing_extensions import Buffer # type: ignore from ._internal import * + class FileReader(io.RawIOBase): - + def __init__(self, inner: "RawFileReader"): self.inner = inner def __len__(self) -> int: return self.inner.file_length() - + def __enter__(self): # Don't need to do anything special here return self - + def __exit__(self, *_args): # Future updates could close the file manually here if that would help clean things up pass + @property + def size(self): + return len(self) + + def seek(self, offset: int, whence=os.SEEK_SET): + """Seek to `offset` relative to `whence`""" + if whence == os.SEEK_SET: + self.inner.seek(offset) + elif whence == os.SEEK_CUR: + self.inner.seek(self.tell() + offset) + elif whence == os.SEEK_END: + self.inner.seek(self.inner.file_length() + offset) + else: + raise ValueError(f"Unsupported whence {whence}") + + def seekable(self): + return True + + def tell(self) -> int: + return self.inner.tell() + def readable(self) -> bool: return True def read(self, size: int = -1) -> bytes: """Read up to `size` bytes from the file, or all content if -1""" return self.inner.read(size) - + def readall(self) -> bytes: return self.read() def read_range(self, offset: int, len: int) -> bytes: """Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file""" return self.inner.read_range(offset, len) - + def close(self) -> None: pass + class FileWriter(io.RawIOBase): def __init__(self, inner: "RawFileWriter"): @@ -55,14 +81,15 @@ def close(self) -> None: def __enter__(self) -> "FileWriter": return self - + def __exit__(self, *_args): self.close() + class Client: - def __init__(self, url: str): - self.inner = RawClient(url) + def __init__(self, url: str, config: Optional[Dict[str, str]] = None): + self.inner = RawClient(url, config) def get_file_info(self, path: str) -> "FileStatus": """Gets the file status for the file at `path`""" @@ -79,7 +106,7 @@ def read(self, path: str) -> FileReader: def create(self, path: str, write_options: WriteOptions) -> FileWriter: """Creates a new file and opens it for writing at `path`""" return FileWriter(self.inner.create(path, write_options)) - + def append(self, path: str) -> FileWriter: """Opens an existing file to append to at `path`""" return FileWriter(self.inner.append(path)) @@ -107,15 +134,20 @@ def delete(self, path: str, recursive: bool) -> bool: is a non-empty directory, this will fail. """ return self.inner.delete(path, recursive) - + def set_times(self, path: str, mtime: int, atime: int) -> None: """ Changes the modification time and access time of the file at `path` to `mtime` and `atime`, respectively. """ return self.inner.set_times(path, mtime, atime) - def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> None: + def set_owner( + self, + path: str, + owner: Optional[str] = None, + group: Optional[str] = None, + ) -> None: """ Sets the owner and/or group for the file at `path` """ - return self.inner.set_owner(path, owner, group) \ No newline at end of file + return self.inner.set_owner(path, owner, group) diff --git a/python/python/hdfs_native/_internal.pyi b/python/hdfs_native/_internal.pyi similarity index 74% rename from python/python/hdfs_native/_internal.pyi rename to python/hdfs_native/_internal.pyi index 653b60f..670a053 100644 --- a/python/python/hdfs_native/_internal.pyi +++ b/python/hdfs_native/_internal.pyi @@ -1,6 +1,7 @@ -from typing import Iterator, Optional -from typing_extensions import Buffer +from typing import Dict, Iterator, Optional +# For some reason mypy doesn't think this exists +from typing_extensions import Buffer # type: ignore class FileStatus: path: str @@ -23,6 +24,12 @@ class RawFileReader: def file_length(self) -> int: """Returns the size of the file""" + def seek(self, pos: int) -> None: + """Sets the cursor to the given position""" + + def tell(self) -> int: + """Returns the current cursor position in the file""" + def read(self, len: int) -> bytes: """Reads `len` bytes from the file, advancing the position in the file""" @@ -37,24 +44,19 @@ class RawFileWriter: """Closes the file and saves the final metadata to the NameNode""" class RawClient: - def __init__(self, url: str) -> None: ... - + def __init__(self, url: str, config: Optional[Dict[str, str]]) -> None: ... def get_file_info(self, path: str) -> FileStatus: ... - def list_status(self, path: str, recursive: bool) -> Iterator[FileStatus]: ... - def read(self, path: str) -> RawFileReader: ... - def create(self, path: str, write_options: WriteOptions) -> RawFileWriter: ... - def append(self, path: str) -> RawFileWriter: ... - def mkdirs(self, path: str, permission: int, create_parent: bool) -> None: ... - def rename(self, src: str, dst: str, overwrite: bool) -> None: ... - def delete(self, path: str, recursive: bool) -> bool: ... - def set_times(self, path: str, mtime: int, atime: int) -> None: ... - - def set_owner(self, path: str, owner: Optional[str], group: Optional[str]) -> None: ... \ No newline at end of file + def set_owner( + self, + path: str, + owner: Optional[str], + group: Optional[str], + ) -> None: ... diff --git a/python/hdfs_native/fsspec.py b/python/hdfs_native/fsspec.py new file mode 100644 index 0000000..a30dd4a --- /dev/null +++ b/python/hdfs_native/fsspec.py @@ -0,0 +1,163 @@ +import secrets +import shutil +import time +import urllib.parse +from contextlib import suppress +from datetime import datetime +from typing import TYPE_CHECKING, Dict, List, Optional, Union + +from fsspec import AbstractFileSystem +from fsspec.utils import tokenize + +from . import Client, WriteOptions + +if TYPE_CHECKING: + from . import FileStatus + + +class HdfsFileSystem(AbstractFileSystem): + root_marker = "/" + + def __init__(self, host: str, port: Optional[int] = None, *args, **storage_options): + super().__init__(host, port, *args, **storage_options) + self.host = host + self.port = port + url = f"{self.protocol}://{host}" + if port: + url += f":{port}" + self.client = Client(url) + + @property + def fsid(self): + return f"hdfs_native_{tokenize(self.protocol, self.host, self.port)}" + + @classmethod + def _strip_protocol(cls, path: str) -> str: + url = urllib.parse.urlparse(path) + return url.path or cls.root_marker + + def unstrip_protocol(self, name: str) -> str: + path = self._strip_protocol(name) + + url = f"{self.protocol}://{self.host}" + if self.port: + url += f":{self.port}" + + return f"{url}{path}" + + @staticmethod + def _get_kwargs_from_urls(path): + url = urllib.parse.urlparse(path) + return {"host": url.hostname, "port": url.port} + + def _convert_file_status(self, file_status: "FileStatus") -> Dict: + return { + "name": file_status.path, + "size": file_status.length, + "type": "directory" if file_status.isdir else "file", + "permission": file_status.permission, + "owner": file_status.owner, + "group": file_status.group, + "modification_time": file_status.modification_time, + "access_time": file_status.access_time, + } + + def info(self, path, **_kwargs) -> Dict: + file_status = self.client.get_file_info(self._strip_protocol(path)) + return self._convert_file_status(file_status) + + def exists(self, path, **_kwargs): + try: + self.info(path) + return True + except FileNotFoundError: + return False + + def ls(self, path: str, detail=True, **kwargs) -> List[Union[str, Dict]]: + listing = self.client.list_status(self._strip_protocol(path), False) + if detail: + return [self._convert_file_status(status) for status in listing] + else: + return [status.path for status in listing] + + def touch(self, path: str, truncate=True, **kwargs): + path = self._strip_protocol(path) + if truncate or not self.exists(path): + with self.open(path, "wb", **kwargs): + pass + else: + now = int(time.time() * 1000) + self.client.set_times(path, now, now) + + def mkdir(self, path: str, create_parents=True, **kwargs): + self.client.mkdirs( + self._strip_protocol(path), + kwargs.get("permission", 0o755), + create_parents, + ) + + def makedirs(self, path: str, exist_ok=False): + path = self._strip_protocol(path) + if not exist_ok and self.exists(path): + raise FileExistsError("File or directory already exists") + + return self.mkdir(path, create_parents=True) + + def mv(self, path1: str, path2: str, **kwargs): + self.client.rename( + self._strip_protocol(path1), + self._strip_protocol(path2), + kwargs.get("overwrite", False), + ) + + def cp_file(self, path1, path2, **kwargs): + with self._open(self._strip_protocol(path1), "rb") as lstream: + tmp_fname = f".{self._strip_protocol(path2)}.tmp.{secrets.token_hex(6)}" + try: + with self.open(tmp_fname, "wb") as rstream: + shutil.copyfileobj(lstream, rstream) + self.mv(tmp_fname, path2) + except BaseException: # noqa + with suppress(FileNotFoundError): + self.fs.delete_file(tmp_fname) + raise + + def rmdir(self, path: str) -> None: + self.client.delete(self._strip_protocol(path), False) + + def rm(self, path: str, recursive=False, maxdepth: Optional[int] = None) -> None: + if maxdepth is not None: + raise NotImplementedError("maxdepth is not supported") + self.client.delete(self._strip_protocol(path), recursive) + + def rm_file(self, path: str): + self.rm(self._strip_protocol(path)) + + def modified(self, path: str): + file_info = self.client.get_file_info(self._strip_protocol(path)) + return datetime.fromtimestamp(file_info.modification_time) + + def _open( + self, + path: str, + mode="rb", + overwrite=True, + replication: Optional[int] = None, + block_size: Optional[int] = None, + **_kwargs, + ): + path = self._strip_protocol(path) + if mode == "rb": + return self.client.read(path) + elif mode == "wb": + write_options = WriteOptions() + write_options.overwrite = overwrite + if replication: + write_options.replication = replication + if block_size: + write_options.block_size = block_size + return self.client.create(path, write_options=write_options) + elif mode == "ab": + return self.client.append(path) + else: + raise ValueError(f"Mode {mode} is not supported") diff --git a/python/pyproject.toml b/python/pyproject.toml index 111abcc..de78ccc 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -13,20 +13,43 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ - "typing-extensions" + "typing-extensions", + "fsspec" +] + + +[project.optional-dependencies] +devel = [ + "mypy~=1.8.0", + "ruff~=0.4.8", + "pytest~=7.4", + "black~=24.4", + "isort~=5.13" ] [project.urls] repository = "https://github.com/Kimahriman/hdfs-native" +[project.entry-points."fsspec.specs"] +hdfs = "hdfs_native.fsspec.HdfsFileSystem" +viewfs = "hdfs_native.fsspec.HdfsFileSystem" + [tool.maturin] features = ["pyo3/extension-module"] -python-source = "python" module-name = "hdfs_native._internal" [tool.mypy] files = [ - "python/**/*.py", - "python/**/*.pyi", - "tests/**/*.py" + "hdfs_native/**/*.py", + "hdfs_native/**/*.pyi", +] + +[[tool.mypy.overrides]] +module = "fsspec.*" +ignore_missing_imports = true + +[tool.pytest.ini_options] +testpaths = [ + "tests", + "hdfs_native", ] diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt deleted file mode 100644 index c7c403c..0000000 --- a/python/requirements-dev.txt +++ /dev/null @@ -1,2 +0,0 @@ -mypy ~= 1.4 -pytest ~= 7.4 \ No newline at end of file diff --git a/python/src/error.rs b/python/src/error.rs index b44a083..87b240f 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -13,6 +13,7 @@ impl From for PyErr { fn from(value: PythonHdfsError) -> Self { match value.0 { HdfsError::IOError(err) => PyIOError::new_err(err), + HdfsError::AlreadyExists(path) => PyFileExistsError::new_err(path), HdfsError::FileNotFound(path) => PyFileNotFoundError::new_err(path), HdfsError::IsADirectoryError(path) => PyIsADirectoryError::new_err(path), HdfsError::UnsupportedFeature(feat) => PyNotImplementedError::new_err(feat), diff --git a/python/src/lib.rs b/python/src/lib.rs index e442cf4..cb7f431 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; use ::hdfs_native::file::{FileReader, FileWriter}; @@ -79,6 +80,14 @@ impl RawFileReader { self.inner.file_length() } + pub fn seek(&mut self, pos: usize) { + self.inner.seek(pos); + } + + pub fn tell(&self) -> usize { + self.inner.tell() + } + pub fn read(&mut self, len: i64) -> PyHdfsResult> { let read_len = if len < 0 { self.inner.remaining() @@ -168,13 +177,14 @@ struct RawClient { #[pymethods] impl RawClient { #[new] - #[pyo3(signature = (url))] - pub fn new(url: &str) -> PyResult { + #[pyo3(signature = (url, config))] + pub fn new(url: &str, config: Option>) -> PyResult { // Initialize logging, ignore errors if this is called multiple times let _ = env_logger::try_init(); Ok(RawClient { - inner: Client::new(url).map_err(PythonHdfsError::from)?, + inner: Client::new_with_config(url, config.unwrap_or_default()) + .map_err(PythonHdfsError::from)?, rt: Arc::new( tokio::runtime::Runtime::new() .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, diff --git a/python/tests/test_fsspec.py b/python/tests/test_fsspec.py new file mode 100644 index 0000000..152fc9f --- /dev/null +++ b/python/tests/test_fsspec.py @@ -0,0 +1,87 @@ +import urllib.parse + +import fsspec +import pytest + +from hdfs_native.fsspec import HdfsFileSystem + + +def test_dirs(fs: HdfsFileSystem): + fs.mkdir("/testdir") + assert fs.info("/testdir")["type"] == "directory" + + try: + fs.makedirs("/testdir", exist_ok=False) + assert False, '"/testdir" already exists, should fail' + except: + pass + + fs.makedirs("/testdir", exist_ok=True) + + fs.mkdir("/testdir/nested/dir") + assert fs.info("/testdir/nested/dir")["type"] == "directory" + + try: + fs.mkdir("/testdir/nested2/dir", create_parents=False) + assert False, "Should fail to make dir because parent doesn't exist" + except: + pass + + with pytest.raises(RuntimeError): + fs.rm("/testdir", recursive=False) + + fs.rm("/testdir", recursive=True) + + assert not fs.exists("/testdir") + + +def test_io(fs: HdfsFileSystem): + with fs.open("/test", mode="wb") as file: + file.write(b"hello there") + + with fs.open("/test", mode="rb") as file: + data = file.read() + assert data == b"hello there" + + fs.write_bytes("/test2", b"hello again") + assert fs.read_bytes("/test2") == b"hello again" + assert fs.read_bytes("/test2", start=1) == b"ello again" + assert fs.read_bytes("/test2", end=-1) == b"hello agai" + + fs.mv("/test2", "/test3") + assert fs.read_text("/test3") == "hello again" + assert not fs.exists("/test2") + + fs.rm("/test") + fs.rm("/test3") + + +def test_listing(fs: HdfsFileSystem): + fs.mkdir("/testdir") + + fs.touch("/testdir/test1") + fs.touch("/testdir/test2") + + assert fs.ls("/", detail=False) == ["/testdir"] + assert fs.ls("/testdir", detail=False) == ["/testdir/test1", "/testdir/test2"] + + listing = fs.ls("/", detail=True) + assert len(listing) == 1 + assert listing[0]["size"] == 0 + assert listing[0]["name"] == "/testdir" + assert listing[0]["type"] == "directory" + + +def test_parsing(minidfs: str): + with fsspec.open(f"{minidfs}/test", "wb") as f: + f.write(b"hey there") + + url = urllib.parse.urlparse(minidfs) + fs: HdfsFileSystem + urlpath: str + fs, urlpath = fsspec.url_to_fs(f"{minidfs}/path") + assert fs.host == url.hostname + assert fs.port == url.port + assert urlpath == "/path" + + assert fs.unstrip_protocol("/path") == f"{minidfs}/path" diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index b3b348c..26a6b58 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -1,11 +1,13 @@ import io + from hdfs_native import Client, WriteOptions + def test_integration(minidfs: str): client = Client(minidfs) client.create("/testfile", WriteOptions()).close() file_info = client.get_file_info("/testfile") - + assert file_info.path == "/testfile" file_list = list(client.list_status("/", False)) @@ -27,7 +29,7 @@ def test_integration(minidfs: str): data = io.BytesIO() for i in range(0, 32 * 1024 * 1024): - data.write(i.to_bytes(4, 'big')) + data.write(i.to_bytes(4, "big")) file.write(data.getbuffer()) @@ -35,13 +37,13 @@ def test_integration(minidfs: str): data = io.BytesIO(file.read()) for i in range(0, 32 * 1024 * 1024): - assert data.read(4) == i.to_bytes(4, 'big') + assert data.read(4) == i.to_bytes(4, "big") with client.append("/testfile") as file: data = io.BytesIO() for i in range(32 * 1024 * 1024, 33 * 1024 * 1024): - data.write(i.to_bytes(4, 'big')) + data.write(i.to_bytes(4, "big")) file.write(data.getbuffer()) @@ -49,8 +51,14 @@ def test_integration(minidfs: str): data = io.BytesIO(file.read()) for i in range(0, 33 * 1024 * 1024): - assert data.read(4) == i.to_bytes(4, 'big') + assert data.read(4) == i.to_bytes(4, "big") + with client.read("/testfile") as file: + # Skip first two ints + file.seek(8) + expected = 2 + assert file.read(4) == expected.to_bytes(4, "big") + assert file.tell() == 12 mtime = 1717641455 atime = 1717641456 @@ -59,7 +67,6 @@ def test_integration(minidfs: str): assert file_info.modification_time == mtime assert file_info.access_time == atime - client.set_owner("/testfile", "testuser", "testgroup") file_info = client.get_file_info("/testfile") assert file_info.owner == "testuser" diff --git a/rust/src/file.rs b/rust/src/file.rs index 772031d..a785a04 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -40,18 +40,33 @@ impl FileReader { } } + /// Returns the total size of the file pub fn file_length(&self) -> usize { self.status.length as usize } + /// Returns the remaining bytes left based on the current cursor position. pub fn remaining(&self) -> usize { - if self.position > self.status.length as usize { + if self.position > self.file_length() { 0 } else { - self.status.length as usize - self.position + self.file_length() - self.position } } + /// Sets the cursor to the position. Panics if the position is beyond the end of the file + pub fn seek(&mut self, pos: usize) { + if pos > self.file_length() { + panic!("Cannot seek beyond the end of a file"); + } + self.position = pos; + } + + /// Returns the current cursor position in the file + pub fn tell(&self) -> usize { + self.position + } + /// Read up to `len` bytes into a new [Bytes] object, advancing the internal position in the file. /// An empty [Bytes] object will be returned if the end of the file has been reached. pub async fn read(&mut self, len: usize) -> Result {