Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create fsspec file system #120

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions .github/workflows/python-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,6 @@ defaults:
working-directory: ./python

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: 3.12
- name: Install dev dependencies
run: |
pip3 install -r requirements-dev.txt
- name: Type checking
run: |
mypy python

test:
strategy:
fail-fast: false
Expand Down Expand Up @@ -68,12 +53,18 @@ jobs:
sccache: 'true'
container: 'off'
working-directory: ./python
args: --features kerberos
args: --features kerberos --extras devel

- name: Install dev dependencies and run tests
- name: Run lints
run: |
source .venv/bin/activate
mypy
isort . --check
black . --check

- name: Run tests
run: |
source .venv/bin/activate
pip3 install -r requirements-dev.txt
pytest

build:
Expand Down
3 changes: 1 addition & 2 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ The same requirements apply as the Rust tests, requiring Java, Maven, Hadoop, an
python3 -m venv .venv
source .venv/bin/activate
pip3 install maturin
pip3 install -r requirements-dev.txt
maturin develop
maturin develop -E devel
pytest
```
18 changes: 15 additions & 3 deletions python/tests/conftest.py → python/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@

import subprocess
import urllib
import urllib.parse

import fsspec
import pytest

@pytest.fixture
from hdfs_native.fsspec import HdfsFileSystem


@pytest.fixture(scope="module")
def minidfs():
child = subprocess.Popen(
[
Expand All @@ -19,7 +25,7 @@ def minidfs():
stderr=subprocess.DEVNULL,
universal_newlines=True,
encoding="utf8",
bufsize=0
bufsize=0,
)

output = child.stdout.readline().strip()
Expand All @@ -31,3 +37,9 @@ def minidfs():
child.communicate(input="\n", timeout=30)
except:
child.kill()


@pytest.fixture(scope="module")
def fs(minidfs: str) -> HdfsFileSystem:
url = urllib.parse.urlparse(minidfs)
return fsspec.filesystem(url.scheme, host=url.hostname, port=url.port)
Original file line number Diff line number Diff line change
@@ -1,42 +1,68 @@
import io
from typing import Iterator, Optional
from typing_extensions import Buffer
import os
from typing import Dict, Iterator, Optional

# For some reason mypy doesn't think this exists
from typing_extensions import Buffer # type: ignore

from ._internal import *


class FileReader(io.RawIOBase):

def __init__(self, inner: "RawFileReader"):
self.inner = inner

def __len__(self) -> int:
return self.inner.file_length()

def __enter__(self):
# Don't need to do anything special here
return self

def __exit__(self, *_args):
# Future updates could close the file manually here if that would help clean things up
pass

@property
def size(self):
return len(self)

def seek(self, offset: int, whence=os.SEEK_SET):
"""Seek to `offset` relative to `whence`"""
if whence == os.SEEK_SET:
self.inner.seek(offset)
elif whence == os.SEEK_CUR:
self.inner.seek(self.tell() + offset)
elif whence == os.SEEK_END:
self.inner.seek(self.inner.file_length() + offset)
else:
raise ValueError(f"Unsupported whence {whence}")

def seekable(self):
return True

def tell(self) -> int:
return self.inner.tell()

def readable(self) -> bool:
return True

def read(self, size: int = -1) -> bytes:
"""Read up to `size` bytes from the file, or all content if -1"""
return self.inner.read(size)

def readall(self) -> bytes:
return self.read()

def read_range(self, offset: int, len: int) -> bytes:
"""Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file"""
return self.inner.read_range(offset, len)

def close(self) -> None:
pass


class FileWriter(io.RawIOBase):

def __init__(self, inner: "RawFileWriter"):
Expand All @@ -55,14 +81,15 @@ def close(self) -> None:

def __enter__(self) -> "FileWriter":
return self

def __exit__(self, *_args):
self.close()


class Client:

def __init__(self, url: str):
self.inner = RawClient(url)
def __init__(self, url: str, config: Optional[Dict[str, str]] = None):
self.inner = RawClient(url, config)

def get_file_info(self, path: str) -> "FileStatus":
"""Gets the file status for the file at `path`"""
Expand All @@ -79,7 +106,7 @@ def read(self, path: str) -> FileReader:
def create(self, path: str, write_options: WriteOptions) -> FileWriter:
"""Creates a new file and opens it for writing at `path`"""
return FileWriter(self.inner.create(path, write_options))

def append(self, path: str) -> FileWriter:
"""Opens an existing file to append to at `path`"""
return FileWriter(self.inner.append(path))
Expand Down Expand Up @@ -107,15 +134,20 @@ def delete(self, path: str, recursive: bool) -> bool:
is a non-empty directory, this will fail.
"""
return self.inner.delete(path, recursive)

def set_times(self, path: str, mtime: int, atime: int) -> None:
"""
Changes the modification time and access time of the file at `path` to `mtime` and `atime`, respectively.
"""
return self.inner.set_times(path, mtime, atime)

def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> None:
def set_owner(
self,
path: str,
owner: Optional[str] = None,
group: Optional[str] = None,
) -> None:
"""
Sets the owner and/or group for the file at `path`
"""
return self.inner.set_owner(path, owner, group)
return self.inner.set_owner(path, owner, group)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Iterator, Optional
from typing_extensions import Buffer
from typing import Dict, Iterator, Optional

# For some reason mypy doesn't think this exists
from typing_extensions import Buffer # type: ignore

class FileStatus:
path: str
Expand All @@ -23,6 +24,12 @@ class RawFileReader:
def file_length(self) -> int:
"""Returns the size of the file"""

def seek(self, pos: int) -> None:
"""Sets the cursor to the given position"""

def tell(self) -> int:
"""Returns the current cursor position in the file"""

def read(self, len: int) -> bytes:
"""Reads `len` bytes from the file, advancing the position in the file"""

Expand All @@ -37,24 +44,19 @@ class RawFileWriter:
"""Closes the file and saves the final metadata to the NameNode"""

class RawClient:
def __init__(self, url: str) -> None: ...

def __init__(self, url: str, config: Optional[Dict[str, str]]) -> None: ...
def get_file_info(self, path: str) -> FileStatus: ...

def list_status(self, path: str, recursive: bool) -> Iterator[FileStatus]: ...

def read(self, path: str) -> RawFileReader: ...

def create(self, path: str, write_options: WriteOptions) -> RawFileWriter: ...

def append(self, path: str) -> RawFileWriter: ...

def mkdirs(self, path: str, permission: int, create_parent: bool) -> None: ...

def rename(self, src: str, dst: str, overwrite: bool) -> None: ...

def delete(self, path: str, recursive: bool) -> bool: ...

def set_times(self, path: str, mtime: int, atime: int) -> None: ...

def set_owner(self, path: str, owner: Optional[str], group: Optional[str]) -> None: ...
def set_owner(
self,
path: str,
owner: Optional[str],
group: Optional[str],
) -> None: ...
Loading
Loading