1
- from contextlib import suppress
2
- from datetime import datetime
3
1
import secrets
4
2
import shutil
5
3
import time
4
+ from contextlib import suppress
5
+ from datetime import datetime
6
6
from typing import TYPE_CHECKING , Dict , List , Optional , Union
7
+
7
8
from fsspec import AbstractFileSystem
8
9
from fsspec .utils import tokenize
9
10
12
13
if TYPE_CHECKING :
13
14
from . import FileStatus
14
15
16
+
15
17
class HdfsFileSystem (AbstractFileSystem ):
16
18
def __init__ (self , host : str , port : Optional [int ] = None , * args , ** storage_options ):
17
19
super ().__init__ (host , port , * args , ** storage_options )
18
20
self .host = host
19
21
self .port = port
20
- url = f' { self .protocol } ://{ host } '
22
+ url = f" { self .protocol } ://{ host } "
21
23
if port :
22
- url += f' :{ port } '
24
+ url += f" :{ port } "
23
25
self .client = Client (url )
24
26
25
27
@property
26
28
def fsid (self ):
27
- return f' hdfs_native_{ tokenize (self .protocol , self .host , self .port )} '
29
+ return f" hdfs_native_{ tokenize (self .protocol , self .host , self .port )} "
28
30
29
- def _convert_file_status (self , file_status : ' FileStatus' ) -> Dict :
31
+ def _convert_file_status (self , file_status : " FileStatus" ) -> Dict :
30
32
return {
31
- ' name' : file_status .path ,
32
- ' size' : file_status .length ,
33
- ' type' : ' directory' if file_status .isdir else ' file' ,
34
- ' permission' : file_status .permission ,
35
- ' owner' : file_status .owner ,
36
- ' group' : file_status .group ,
37
- ' modification_time' : file_status .modification_time ,
38
- ' access_time' : file_status .access_time
33
+ " name" : file_status .path ,
34
+ " size" : file_status .length ,
35
+ " type" : " directory" if file_status .isdir else " file" ,
36
+ " permission" : file_status .permission ,
37
+ " owner" : file_status .owner ,
38
+ " group" : file_status .group ,
39
+ " modification_time" : file_status .modification_time ,
40
+ " access_time" : file_status .access_time ,
39
41
}
40
42
41
43
def info (self , path , ** _kwargs ) -> Dict :
42
44
file_status = self .client .get_file_info (path )
43
45
return self ._convert_file_status (file_status )
44
-
46
+
45
47
def exists (self , path , ** _kwargs ):
46
48
try :
47
49
self .info (path )
@@ -58,23 +60,23 @@ def ls(self, path: str, detail=True, **kwargs) -> List[Union[str, Dict]]:
58
60
59
61
def touch (self , path : str , truncate = True , ** kwargs ):
60
62
if truncate or not self .exists (path ):
61
- with self .open (path , 'wb' , ** kwargs ):
63
+ with self .open (path , "wb" , ** kwargs ):
62
64
pass
63
65
else :
64
66
now = int (time .time () * 1000 )
65
67
self .client .set_times (path , now , now )
66
68
67
69
def mkdir (self , path : str , create_parents = True , ** kwargs ):
68
- self .client .mkdirs (path , kwargs .get (' permission' , 0o755 ), create_parents )
70
+ self .client .mkdirs (path , kwargs .get (" permission" , 0o755 ), create_parents )
69
71
70
72
def makedirs (self , path : str , exist_ok = False ):
71
73
if not exist_ok and self .exists (path ):
72
- raise FileExistsError (' File or directory already exists' )
74
+ raise FileExistsError (" File or directory already exists" )
73
75
74
76
return self .mkdir (path , create_parents = True )
75
77
76
78
def mv (self , path1 : str , path2 : str , ** kwargs ):
77
- self .client .rename (path1 , path2 , kwargs .get (' overwrite' , False ))
79
+ self .client .rename (path1 , path2 , kwargs .get (" overwrite" , False ))
78
80
79
81
def cp_file (self , path1 , path2 , ** kwargs ):
80
82
with self ._open (path1 , "rb" ) as lstream :
@@ -93,7 +95,7 @@ def rmdir(self, path: str) -> None:
93
95
94
96
def rm (self , path : str , recursive = False , maxdepth : Optional [int ] = None ) -> None :
95
97
if maxdepth is not None :
96
- raise NotImplementedError (' maxdepth is not supported' )
98
+ raise NotImplementedError (" maxdepth is not supported" )
97
99
self .client .delete (path , recursive )
98
100
99
101
def rm_file (self , path : str ):
@@ -110,19 +112,19 @@ def _open(
110
112
overwrite = True ,
111
113
replication : Optional [int ] = None ,
112
114
block_size : Optional [int ] = None ,
113
- ** _kwargs
115
+ ** _kwargs ,
114
116
):
115
- if mode == 'rb' :
117
+ if mode == "rb" :
116
118
return self .client .read (path )
117
- elif mode == 'wb' :
119
+ elif mode == "wb" :
118
120
write_options = WriteOptions ()
119
121
write_options .overwrite = overwrite
120
122
if replication :
121
123
write_options .replication = replication
122
124
if block_size :
123
125
write_options .block_size = block_size
124
126
return self .client .create (path , write_options = write_options )
125
- elif mode == 'ab' :
127
+ elif mode == "ab" :
126
128
return self .client .append (path )
127
129
else :
128
- raise ValueError (f' Mode { mode } is not supported' )
130
+ raise ValueError (f" Mode { mode } is not supported" )
0 commit comments