Skip to content

Commit 6030c6f

Browse files
authoredJun 12, 2024
Add getContentSummary support (#125)
1 parent 20cb34e commit 6030c6f

File tree

9 files changed

+166
-1
lines changed

9 files changed

+166
-1
lines changed
 

‎python/hdfs_native/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,9 @@ def set_replication(self, path: str, replication: int) -> bool:
164164
Sets the replication for file at `path` to `replication`
165165
"""
166166
return self.inner.set_replication(path, replication)
167+
168+
def get_content_summary(self, path: str) -> "ContentSummary":
169+
"""
170+
Gets a content summary for `path`
171+
"""
172+
return self.inner.get_content_summary(path)

‎python/hdfs_native/_internal.pyi

+9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ class FileStatus:
1515
replication: Optional[int]
1616
blocksize: Optional[int]
1717

18+
class ContentSummary:
19+
length: int
20+
file_count: int
21+
directory_count: int
22+
quota: int
23+
space_consumed: int
24+
space_quota: int
25+
1826
class WriteOptions:
1927
block_size: Optional[int]
2028
replication: Optional[int]
@@ -64,3 +72,4 @@ class RawClient:
6472
) -> None: ...
6573
def set_permission(self, path: str, permission: int) -> None: ...
6674
def set_replication(self, path: str, replication: int) -> bool: ...
75+
def get_content_summary(self, path) -> ContentSummary: ...

‎python/hdfs_native/fsspec.py

+17
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,23 @@ def touch(self, path: str, truncate=True, **kwargs):
8989
now = int(time.time() * 1000)
9090
self.client.set_times(path, now, now)
9191

92+
def du(
93+
self,
94+
path: str,
95+
total=True,
96+
maxdepth: Optional[int] = None,
97+
withdirs=False,
98+
**kwargs,
99+
) -> Union[int, Dict[str, int]]:
100+
if total:
101+
if maxdepth is not None:
102+
raise NotImplementedError("maxdepth is not supported with total")
103+
104+
content_summary = self.client.get_content_summary(path)
105+
return content_summary.length
106+
else:
107+
return super().du(path, total, maxdepth, withdirs, **kwargs)
108+
92109
def mkdir(self, path: str, create_parents=True, **kwargs):
93110
self.client.mkdirs(
94111
self._strip_protocol(path),

‎python/src/lib.rs

+31
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use ::hdfs_native::{
99
Client,
1010
};
1111
use bytes::Bytes;
12+
use hdfs_native::client::ContentSummary;
1213
use pyo3::{exceptions::PyRuntimeError, prelude::*};
1314
use tokio::runtime::Runtime;
1415

@@ -72,6 +73,29 @@ impl PyFileStatusIter {
7273
}
7374
}
7475

76+
#[pyclass(get_all, frozen, name = "ContentSummary")]
77+
struct PyContentSummary {
78+
length: u64,
79+
file_count: u64,
80+
directory_count: u64,
81+
quota: u64,
82+
space_consumed: u64,
83+
space_quota: u64,
84+
}
85+
86+
impl From<ContentSummary> for PyContentSummary {
87+
fn from(value: ContentSummary) -> Self {
88+
Self {
89+
length: value.length,
90+
file_count: value.file_count,
91+
directory_count: value.directory_count,
92+
quota: value.quota,
93+
space_consumed: value.space_consumed,
94+
space_quota: value.space_quota,
95+
}
96+
}
97+
}
98+
7599
#[pyclass]
76100
struct RawFileReader {
77101
inner: FileReader,
@@ -278,6 +302,13 @@ impl RawClient {
278302
.rt
279303
.block_on(self.inner.set_replication(path, replication))?)
280304
}
305+
306+
pub fn get_content_summary(&self, path: &str) -> PyHdfsResult<PyContentSummary> {
307+
Ok(self
308+
.rt
309+
.block_on(self.inner.get_content_summary(path))?
310+
.into())
311+
}
281312
}
282313

283314
/// A Python module implemented in Rust.

‎python/tests/test_fsspec.py

+16
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ def test_listing(fs: HdfsFileSystem):
7171
assert listing[0]["name"] == "/testdir"
7272
assert listing[0]["type"] == "directory"
7373

74+
fs.rm("/testdir", True)
75+
7476

7577
def test_parsing(minidfs: str):
7678
with fsspec.open(f"{minidfs}/test", "wb") as f:
@@ -85,3 +87,17 @@ def test_parsing(minidfs: str):
8587
assert urlpath == "/path"
8688

8789
assert fs.unstrip_protocol("/path") == f"{minidfs}/path"
90+
91+
92+
def test_du(fs: HdfsFileSystem):
93+
with fs.open("/test", mode="wb") as file:
94+
file.write(b"hello there")
95+
96+
with fs.open("/test2", mode="wb") as file:
97+
file.write(b"hello again")
98+
99+
assert fs.du("/test") == 11
100+
assert fs.du("/test2") == 11
101+
assert fs.du("/") == 22
102+
103+
assert fs.du("/", total=False) == {"/test": 11, "/test2": 11}

‎python/tests/test_integration.py

+5
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,9 @@ def test_integration(minidfs: str):
9595
file_info = client.get_file_info("/testfile")
9696
assert file_info.replication == 2
9797

98+
content_summary = client.get_content_summary("/")
99+
assert content_summary.file_count == 1
100+
assert content_summary.directory_count == 1
101+
assert content_summary.length == 33 * 1024 * 1024 * 4
102+
98103
client.delete("/testfile", False)

‎rust/src/client.rs

+36-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::hdfs::protocol::NamenodeProtocol;
1414
use crate::hdfs::proxy::NameServiceProxy;
1515
use crate::proto::hdfs::hdfs_file_status_proto::FileType;
1616

17-
use crate::proto::hdfs::HdfsFileStatusProto;
17+
use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
1818

1919
#[derive(Clone)]
2020
pub struct WriteOptions {
@@ -475,6 +475,18 @@ impl Client {
475475

476476
Ok(result)
477477
}
478+
479+
/// Gets a content summary for a file or directory rooted at `path
480+
pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
481+
let (link, resolved_path) = self.mount_table.resolve(path);
482+
let result = link
483+
.protocol
484+
.get_content_summary(&resolved_path)
485+
.await?
486+
.summary;
487+
488+
Ok(result.into())
489+
}
478490
}
479491

480492
impl Default for Client {
@@ -664,6 +676,29 @@ impl FileStatus {
664676
}
665677
}
666678

679+
#[derive(Debug)]
680+
pub struct ContentSummary {
681+
pub length: u64,
682+
pub file_count: u64,
683+
pub directory_count: u64,
684+
pub quota: u64,
685+
pub space_consumed: u64,
686+
pub space_quota: u64,
687+
}
688+
689+
impl From<ContentSummaryProto> for ContentSummary {
690+
fn from(value: ContentSummaryProto) -> Self {
691+
ContentSummary {
692+
length: value.length,
693+
file_count: value.file_count,
694+
directory_count: value.directory_count,
695+
quota: value.quota,
696+
space_consumed: value.space_consumed,
697+
space_quota: value.space_quota,
698+
}
699+
}
700+
}
701+
667702
#[cfg(test)]
668703
mod test {
669704
use std::{

‎rust/src/hdfs/protocol.rs

+23
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,29 @@ impl NamenodeProtocol {
479479
debug!("setReplication response: {:?}", &decoded);
480480
Ok(decoded)
481481
}
482+
483+
pub(crate) async fn get_content_summary(
484+
&self,
485+
path: &str,
486+
) -> Result<hdfs::GetContentSummaryResponseProto> {
487+
let message = hdfs::GetContentSummaryRequestProto {
488+
path: path.to_string(),
489+
};
490+
491+
debug!("getContentSummary request: {:?}", &message);
492+
493+
let response = self
494+
.proxy
495+
.call(
496+
"getContentSummary",
497+
message.encode_length_delimited_to_vec(),
498+
)
499+
.await?;
500+
501+
let decoded = hdfs::GetContentSummaryResponseProto::decode_length_delimited(response)?;
502+
debug!("getContentSummary response: {:?}", &decoded);
503+
Ok(decoded)
504+
}
482505
}
483506

484507
impl Drop for NamenodeProtocol {

‎rust/tests/test_integration.rs

+23
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ mod test {
191191
test_set_owner(&client).await?;
192192
test_set_permission(&client).await?;
193193
test_set_replication(&client).await?;
194+
test_get_content_summary(&client).await?;
194195

195196
Ok(())
196197
}
@@ -426,4 +427,26 @@ mod test {
426427

427428
Ok(())
428429
}
430+
431+
async fn test_get_content_summary(client: &Client) -> Result<()> {
432+
let mut file1 = client.create("/test", WriteOptions::default()).await?;
433+
434+
file1.write(vec![0, 1, 2, 3].into()).await?;
435+
file1.close().await?;
436+
437+
let mut file2 = client.create("/test2", WriteOptions::default()).await?;
438+
439+
file2.write(vec![0, 1, 2, 3, 4, 5].into()).await?;
440+
file2.close().await?;
441+
442+
client.mkdirs("/testdir", 0o755, true).await?;
443+
444+
let content_summary = client.get_content_summary("/").await?;
445+
assert_eq!(content_summary.file_count, 3,);
446+
assert_eq!(content_summary.directory_count, 2);
447+
// Test file plus the two we made above
448+
assert_eq!(content_summary.length, TEST_FILE_INTS as u64 * 4 + 4 + 6);
449+
450+
Ok(())
451+
}
429452
}

0 commit comments

Comments
 (0)