Skip to content

Commit 8617932

Browse files
authored
ACL support (#160)
1 parent 1a49df1 commit 8617932

File tree

10 files changed

+822
-219
lines changed

10 files changed

+822
-219
lines changed

README.md

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# Native Rust HDFS client
2-
This is a proof-of-concept HDFS client written natively in Rust. All other clients I have found in any other language are simply wrappers around libhdfs and require all the same Java dependencies, so I wanted to see if I could write one from scratch given that HDFS isn't really changing very often anymore. Several basic features are working, however it is not nearly as robust and the real HDFS client.
3-
4-
What this is not trying to do is implement all HDFS client/FileSystem interfaces, just things involving reading and writing data.
2+
This is an experimental HDFS client written natively in Rust. Several basic features are working, however it is not nearly as robust and the real HDFS client.
53

64
## Supported HDFS features
75
Here is a list of currently supported and unsupported but possible future features.
@@ -12,10 +10,15 @@ Here is a list of currently supported and unsupported but possible future featur
1210
- [x] Writing
1311
- [x] Rename
1412
- [x] Delete
13+
- [x] Basic Permissions and ownership
14+
- [x] ACLs
15+
- [x] Content summary
16+
- [x] Set replication
17+
- [x] Set timestamps
1518

1619
### HDFS Features
1720
- [x] Name Services
18-
- [ ] Observer reads (state ID tracking is supported, but needs improvements on tracking Observer/Active NameNode)
21+
- [x] Observer reads
1922
- [x] ViewFS
2023
- [x] Router based federation
2124
- [x] Erasure coded reads and writes

python/hdfs_native/__init__.py

+54-6
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
import io
22
import os
3-
from typing import TYPE_CHECKING, Dict, Iterator, Optional
3+
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional
44

55
# For some reason mypy doesn't think this exists
66
from typing_extensions import Buffer # type: ignore
77

8-
from ._internal import RawClient, WriteOptions
8+
from ._internal import (
9+
AclEntry,
10+
AclStatus,
11+
ContentSummary,
12+
FileStatus,
13+
RawClient,
14+
WriteOptions,
15+
)
916

1017
if TYPE_CHECKING:
11-
from ._internal import ContentSummary, FileStatus, RawFileReader, RawFileWriter
18+
from ._internal import (
19+
RawFileReader,
20+
RawFileWriter,
21+
)
1222

1323

1424
class FileReader(io.RawIOBase):
@@ -95,11 +105,11 @@ def __init__(
95105
):
96106
self.inner = RawClient(url, config)
97107

98-
def get_file_info(self, path: str) -> "FileStatus":
108+
def get_file_info(self, path: str) -> FileStatus:
99109
"""Gets the file status for the file at `path`"""
100110
return self.inner.get_file_info(path)
101111

102-
def list_status(self, path: str, recursive: bool = False) -> Iterator["FileStatus"]:
112+
def list_status(self, path: str, recursive: bool = False) -> Iterator[FileStatus]:
103113
"""Gets the status of files rooted at `path`. If `recursive` is true, lists all files recursively."""
104114
return self.inner.list_status(path, recursive)
105115

@@ -181,8 +191,46 @@ def set_replication(self, path: str, replication: int) -> bool:
181191
"""
182192
return self.inner.set_replication(path, replication)
183193

184-
def get_content_summary(self, path: str) -> "ContentSummary":
194+
def get_content_summary(self, path: str) -> ContentSummary:
185195
"""
186196
Gets a content summary for `path`
187197
"""
188198
return self.inner.get_content_summary(path)
199+
200+
def modify_acl_entries(self, path: str, entries: List[AclEntry]) -> None:
201+
"""
202+
Update ACL entries for file or directory at `path`. Existing entries will remain.
203+
"""
204+
return self.inner.modify_acl_entries(path, entries)
205+
206+
def remove_acl_entries(self, path: str, entries: List[AclEntry]) -> None:
207+
"""
208+
Remove specific ACL entries for file or directory at `path`.
209+
"""
210+
return self.inner.remove_acl_entries(path, entries)
211+
212+
def remove_default_acl(self, path: str) -> None:
213+
"""
214+
Remove all default ACLs for file or directory at `path`.
215+
"""
216+
return self.inner.remove_default_acl(path)
217+
218+
def remove_acl(self, path: str) -> None:
219+
"""
220+
Remove all ACL entries for file or directory at `path`.
221+
"""
222+
return self.inner.remove_acl(path)
223+
224+
def set_acl(self, path: str, entries: List[AclEntry]) -> None:
225+
"""
226+
Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
227+
default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
228+
maintained.
229+
"""
230+
return self.inner.set_acl(path, entries)
231+
232+
def get_acl_status(self, path: str) -> AclStatus:
233+
"""
234+
Get the ACL status for the file or directory at `path`.
235+
"""
236+
return self.inner.get_acl_status(path)

python/hdfs_native/_internal.pyi

+33-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, Iterator, Optional
1+
from typing import Dict, Iterator, List, Literal, Optional
22

33
# For some reason mypy doesn't think this exists
44
from typing_extensions import Buffer # type: ignore
@@ -23,6 +23,31 @@ class ContentSummary:
2323
space_consumed: int
2424
space_quota: int
2525

26+
AclEntryType = Literal["user", "group", "mask", "other"]
27+
AclEntryScope = Literal["access", "default"]
28+
FsAction = Literal["---", "--x", "-w-", "-wx", "r--", "r-x", "rw-", "rwx"]
29+
30+
class AclEntry:
31+
type: AclEntryType
32+
scope: AclEntryScope
33+
permissions: FsAction
34+
name: Optional[str]
35+
36+
def __init__(
37+
self,
38+
type: AclEntryType,
39+
scope: AclEntryScope,
40+
permissions: FsAction,
41+
name: Optional[str] = None,
42+
): ...
43+
44+
class AclStatus:
45+
owner: str
46+
group: str
47+
sticky: bool
48+
entries: List[AclEntry]
49+
permission: int
50+
2651
class WriteOptions:
2752
block_size: Optional[int]
2853
replication: Optional[int]
@@ -85,4 +110,10 @@ class RawClient:
85110
) -> None: ...
86111
def set_permission(self, path: str, permission: int) -> None: ...
87112
def set_replication(self, path: str, replication: int) -> bool: ...
88-
def get_content_summary(self, path) -> ContentSummary: ...
113+
def get_content_summary(self, path: str) -> ContentSummary: ...
114+
def modify_acl_entries(self, path: str, entries: List[AclEntry]) -> None: ...
115+
def remove_acl_entries(self, path: str, entries: List[AclEntry]) -> None: ...
116+
def remove_default_acl(self, path: str) -> None: ...
117+
def remove_acl(self, path: str) -> None: ...
118+
def set_acl(self, path: str, entries: List[AclEntry]) -> None: ...
119+
def get_acl_status(self, path: str) -> AclStatus: ...

python/src/lib.rs

+141
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use ::hdfs_native::{
99
Client,
1010
};
1111
use bytes::Bytes;
12+
use hdfs_native::acl::{AclEntry, AclStatus};
1213
use hdfs_native::client::ContentSummary;
1314
use pyo3::{exceptions::PyRuntimeError, prelude::*};
1415
use tokio::runtime::Runtime;
@@ -131,6 +132,93 @@ impl PyContentSummary {
131132
}
132133
}
133134

135+
#[pyclass(get_all, frozen, name = "AclStatus")]
136+
struct PyAclStatus {
137+
owner: String,
138+
group: String,
139+
sticky: bool,
140+
entries: Vec<PyAclEntry>,
141+
permission: u16,
142+
}
143+
144+
impl From<AclStatus> for PyAclStatus {
145+
fn from(value: AclStatus) -> Self {
146+
Self {
147+
owner: value.owner,
148+
group: value.group,
149+
sticky: value.sticky,
150+
entries: value.entries.into_iter().map(PyAclEntry::from).collect(),
151+
permission: value.permission,
152+
}
153+
}
154+
}
155+
156+
#[pymethods]
157+
impl PyAclStatus {
158+
/// Return a dataclass-esque format for the repr
159+
fn __repr__(&self) -> String {
160+
format!("AclStatus(owner='{}')", self.owner)
161+
}
162+
}
163+
164+
#[pyclass(get_all, set_all, name = "AclEntry")]
165+
#[derive(Clone, Default)]
166+
struct PyAclEntry {
167+
r#type: String,
168+
scope: String,
169+
permissions: String,
170+
name: Option<String>,
171+
}
172+
173+
impl From<AclEntry> for PyAclEntry {
174+
fn from(value: AclEntry) -> Self {
175+
Self {
176+
r#type: value.r#type.to_string(),
177+
scope: value.scope.to_string(),
178+
permissions: value.permissions.to_string(),
179+
name: value.name,
180+
}
181+
}
182+
}
183+
184+
impl From<PyAclEntry> for AclEntry {
185+
fn from(value: PyAclEntry) -> Self {
186+
Self {
187+
r#type: value.r#type.into(),
188+
scope: value.scope.into(),
189+
permissions: value.permissions.into(),
190+
name: value.name,
191+
}
192+
}
193+
}
194+
195+
impl FromIterator<PyAclEntry> for Vec<AclEntry> {
196+
fn from_iter<T: IntoIterator<Item = PyAclEntry>>(iter: T) -> Self {
197+
iter.into_iter().map(AclEntry::from).collect()
198+
}
199+
}
200+
201+
#[pymethods]
202+
impl PyAclEntry {
203+
#[new]
204+
pub fn new(r#type: String, scope: String, permissions: String, name: Option<String>) -> Self {
205+
Self {
206+
r#type,
207+
scope,
208+
permissions,
209+
name,
210+
}
211+
}
212+
213+
/// Return a dataclass-esque format for the repr
214+
fn __repr__(&self) -> String {
215+
format!(
216+
"AclEntry(type='{}', scope='{}', permissions='{}', name='{:?}')",
217+
self.r#type, self.scope, self.permissions, self.name
218+
)
219+
}
220+
}
221+
134222
#[pyclass]
135223
struct RawFileReader {
136224
inner: FileReader,
@@ -400,12 +488,65 @@ impl RawClient {
400488
.allow_threads(|| self.rt.block_on(self.inner.get_content_summary(path)))?
401489
.into())
402490
}
491+
492+
pub fn modify_acl_entries(
493+
&self,
494+
path: &str,
495+
acl_spec: Vec<PyAclEntry>,
496+
py: Python,
497+
) -> PyHdfsResult<()> {
498+
Ok(py.allow_threads(|| {
499+
self.rt.block_on(
500+
self.inner
501+
.modify_acl_entries(path, acl_spec.into_iter().collect()),
502+
)
503+
})?)
504+
}
505+
506+
pub fn remove_acl_entries(
507+
&self,
508+
path: &str,
509+
acl_spec: Vec<PyAclEntry>,
510+
py: Python,
511+
) -> PyHdfsResult<()> {
512+
Ok(py.allow_threads(|| {
513+
self.rt.block_on(
514+
self.inner
515+
.remove_acl_entries(path, acl_spec.into_iter().collect()),
516+
)
517+
})?)
518+
}
519+
520+
pub fn remove_default_acl(&self, path: &str, py: Python) -> PyHdfsResult<()> {
521+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.remove_default_acl(path)))?)
522+
}
523+
524+
pub fn remove_acl(&self, path: &str, py: Python) -> PyHdfsResult<()> {
525+
Ok(py.allow_threads(|| self.rt.block_on(self.inner.remove_acl(path)))?)
526+
}
527+
528+
pub fn set_acl(&self, path: &str, acl_spec: Vec<PyAclEntry>, py: Python) -> PyHdfsResult<()> {
529+
Ok(py.allow_threads(|| {
530+
self.rt
531+
.block_on(self.inner.set_acl(path, acl_spec.into_iter().collect()))
532+
})?)
533+
}
534+
535+
pub fn get_acl_status(&self, path: &str, py: Python) -> PyHdfsResult<PyAclStatus> {
536+
Ok(py
537+
.allow_threads(|| self.rt.block_on(self.inner.get_acl_status(path)))?
538+
.into())
539+
}
403540
}
404541

405542
/// A Python module implemented in Rust.
406543
#[pymodule]
407544
fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
408545
m.add_class::<RawClient>()?;
546+
m.add_class::<PyFileStatus>()?;
547+
m.add_class::<PyContentSummary>()?;
409548
m.add_class::<PyWriteOptions>()?;
549+
m.add_class::<PyAclEntry>()?;
550+
m.add_class::<PyAclStatus>()?;
410551
Ok(())
411552
}

python/tests/test_integration.py

+32-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import io
22

3-
from hdfs_native import Client, WriteOptions
3+
from hdfs_native import AclEntry, Client, WriteOptions
44

55

66
def test_integration(client: Client):
@@ -115,3 +115,34 @@ def test_write_options(client: Client):
115115
assert file_info.length == 0
116116
assert file_info.permission == 0o700
117117
assert file_info.blocksize == 1024 * 1024
118+
119+
120+
def test_acls(client: Client):
121+
client.create("/test").close()
122+
123+
acl_status = client.get_acl_status("/test")
124+
assert len(acl_status.entries) == 0
125+
126+
client.modify_acl_entries("/test", [AclEntry("user", "access", "r-x", "testuser")])
127+
# Should be 2 entries now, a default group entry gets added as well
128+
acl_status = client.get_acl_status("/test")
129+
assert len(acl_status.entries) == 2
130+
131+
client.remove_acl("/test")
132+
acl_status = client.get_acl_status("/test")
133+
assert len(acl_status.entries) == 0
134+
135+
client.delete("/test")
136+
137+
client.mkdirs("/testdir")
138+
139+
client.modify_acl_entries(
140+
"/testdir", [AclEntry("user", "default", "rwx", "testuser")]
141+
)
142+
# 4 other defaults get added automatically
143+
acl_status = client.get_acl_status("/testdir")
144+
assert len(acl_status.entries) == 5
145+
146+
client.remove_default_acl("/testdir")
147+
acl_status = client.get_acl_status("/testdir")
148+
assert len(acl_status.entries) == 0

0 commit comments

Comments
 (0)