-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy path__init__.py
153 lines (115 loc) · 4.95 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import io
import os
from typing import Dict, Iterator, Optional
# For some reason mypy doesn't think this exists
from typing_extensions import Buffer # type: ignore
from ._internal import *
class FileReader(io.RawIOBase):
def __init__(self, inner: "RawFileReader"):
self.inner = inner
def __len__(self) -> int:
return self.inner.file_length()
def __enter__(self):
# Don't need to do anything special here
return self
def __exit__(self, *_args):
# Future updates could close the file manually here if that would help clean things up
pass
@property
def size(self):
return len(self)
def seek(self, offset: int, whence=os.SEEK_SET):
"""Seek to `offset` relative to `whence`"""
if whence == os.SEEK_SET:
self.inner.seek(offset)
elif whence == os.SEEK_CUR:
self.inner.seek(self.tell() + offset)
elif whence == os.SEEK_END:
self.inner.seek(self.inner.file_length() + offset)
else:
raise ValueError(f"Unsupported whence {whence}")
def seekable(self):
return True
def tell(self) -> int:
return self.inner.tell()
def readable(self) -> bool:
return True
def read(self, size: int = -1) -> bytes:
"""Read up to `size` bytes from the file, or all content if -1"""
return self.inner.read(size)
def readall(self) -> bytes:
return self.read()
def read_range(self, offset: int, len: int) -> bytes:
"""Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file"""
return self.inner.read_range(offset, len)
def close(self) -> None:
pass
class FileWriter(io.RawIOBase):
def __init__(self, inner: "RawFileWriter"):
self.inner = inner
def writable(self) -> bool:
return True
def write(self, buf: Buffer) -> int:
"""Writes `buf` to the file. Always writes all bytes"""
return self.inner.write(buf)
def close(self) -> None:
"""Closes the file and saves the final metadata to the NameNode"""
self.inner.close()
def __enter__(self) -> "FileWriter":
return self
def __exit__(self, *_args):
self.close()
class Client:
def __init__(self, url: str, config: Optional[Dict[str, str]] = None):
self.inner = RawClient(url, config)
def get_file_info(self, path: str) -> "FileStatus":
"""Gets the file status for the file at `path`"""
return self.inner.get_file_info(path)
def list_status(self, path: str, recursive: bool) -> Iterator["FileStatus"]:
"""Gets the status of files rooted at `path`. If `recursive` is true, lists all files recursively."""
return self.inner.list_status(path, recursive)
def read(self, path: str) -> FileReader:
"""Opens a file for reading at `path`"""
return FileReader(self.inner.read(path))
def create(self, path: str, write_options: WriteOptions) -> FileWriter:
"""Creates a new file and opens it for writing at `path`"""
return FileWriter(self.inner.create(path, write_options))
def append(self, path: str) -> FileWriter:
"""Opens an existing file to append to at `path`"""
return FileWriter(self.inner.append(path))
def mkdirs(self, path: str, permission: int, create_parent: bool) -> None:
"""
Creates a directory at `path` with unix permissions `permission`. If `create_parent` is true,
any parent directories that don't exist will also be created. Otherwise this will fail if
all parent directories don't already exist.
"""
return self.inner.mkdirs(path, permission, create_parent)
def rename(self, src: str, dst: str, overwrite: bool) -> None:
"""
Moves a file or directory from `src` to `dst`. If `overwrite` is True, the destination will be
overriden if it already exists, otherwise the operation will fail if the destination
exists.
"""
return self.inner.rename(src, dst, overwrite)
def delete(self, path: str, recursive: bool) -> bool:
"""
Deletes a file or directory at `path`. If `recursive` is True and the target is a directory,
this will delete all contents underneath the directory. If `recursive` is False and the target
is a non-empty directory, this will fail.
"""
return self.inner.delete(path, recursive)
def set_times(self, path: str, mtime: int, atime: int) -> None:
"""
Changes the modification time and access time of the file at `path` to `mtime` and `atime`, respectively.
"""
return self.inner.set_times(path, mtime, atime)
def set_owner(
self,
path: str,
owner: Optional[str] = None,
group: Optional[str] = None,
) -> None:
"""
Sets the owner and/or group for the file at `path`
"""
return self.inner.set_owner(path, owner, group)