@@ -17,24 +17,24 @@ import (
17
17
)
18
18
19
19
// Encapsulates state and operations for directory node on the HDFS file system
20
- type Dir struct {
21
- FileSystem * FileSystem // Pointer to the owning filesystem
22
- Attrs Attrs // Cached attributes of the directory, TODO: add TTL
23
- Parent * Dir // Pointer to the parent directory (allows computing fully-qualified paths on demand)
24
- Entries map [string ]* fs.Node // Cahed directory entries
25
- EntriesMutex sync.Mutex // Used to protect Entries
20
+ type DirINode struct {
21
+ FileSystem * FileSystem // Pointer to the owning filesystem
22
+ Attrs Attrs // Cached attributes of the directory, TODO: add TTL
23
+ Parent * DirINode // Pointer to the parent directory (allows computing fully-qualified paths on demand)
24
+ Entries map [string ]* fs.Node // Cahed directory entries
25
+ mutex sync.Mutex // One read or write operation on a directory at a time
26
26
}
27
27
28
28
// Verify that *Dir implements necesary FUSE interfaces
29
- var _ fs.Node = (* Dir )(nil )
30
- var _ fs.HandleReadDirAller = (* Dir )(nil )
31
- var _ fs.NodeStringLookuper = (* Dir )(nil )
32
- var _ fs.NodeMkdirer = (* Dir )(nil )
33
- var _ fs.NodeRemover = (* Dir )(nil )
34
- var _ fs.NodeRenamer = (* Dir )(nil )
29
+ var _ fs.Node = (* DirINode )(nil )
30
+ var _ fs.HandleReadDirAller = (* DirINode )(nil )
31
+ var _ fs.NodeStringLookuper = (* DirINode )(nil )
32
+ var _ fs.NodeMkdirer = (* DirINode )(nil )
33
+ var _ fs.NodeRemover = (* DirINode )(nil )
34
+ var _ fs.NodeRenamer = (* DirINode )(nil )
35
35
36
36
// Returns absolute path of the dir in HDFS namespace
37
- func (dir * Dir ) AbsolutePath () string {
37
+ func (dir * DirINode ) AbsolutePath () string {
38
38
if dir .Parent == nil {
39
39
return dir .FileSystem .SrcDir
40
40
} else {
@@ -43,7 +43,7 @@ func (dir *Dir) AbsolutePath() string {
43
43
}
44
44
45
45
// Returns absolute path of the child item of this directory
46
- func (dir * Dir ) AbsolutePathForChild (name string ) string {
46
+ func (dir * DirINode ) AbsolutePathForChild (name string ) string {
47
47
path := dir .AbsolutePath ()
48
48
if path != "/" {
49
49
path = path + "/"
@@ -52,7 +52,9 @@ func (dir *Dir) AbsolutePathForChild(name string) string {
52
52
}
53
53
54
54
// Responds on FUSE request to get directory attributes
55
- func (dir * Dir ) Attr (ctx context.Context , a * fuse.Attr ) error {
55
+ func (dir * DirINode ) Attr (ctx context.Context , a * fuse.Attr ) error {
56
+ dir .lockMutex ()
57
+ defer dir .unlockMutex ()
56
58
if dir .Parent != nil && dir .FileSystem .Clock .Now ().After (dir .Attrs .Expires ) {
57
59
err := dir .Parent .LookupAttrs (dir .Attrs .Name , & dir .Attrs )
58
60
if err != nil {
@@ -63,54 +65,47 @@ func (dir *Dir) Attr(ctx context.Context, a *fuse.Attr) error {
63
65
return dir .Attrs .Attr (a )
64
66
}
65
67
66
- func (dir * Dir ) EntriesGet (name string ) * fs.Node {
67
- dir .EntriesMutex .Lock ()
68
- defer dir .EntriesMutex .Unlock ()
68
+ func (dir * DirINode ) EntriesGet (name string ) * fs.Node {
69
69
if dir .Entries == nil {
70
70
dir .Entries = make (map [string ]* fs.Node )
71
71
return nil
72
72
}
73
73
return dir .Entries [name ]
74
74
}
75
75
76
- func (dir * Dir ) EntriesSet (name string , node * fs.Node ) {
77
- dir .EntriesMutex .Lock ()
78
- defer dir .EntriesMutex .Unlock ()
79
-
76
+ func (dir * DirINode ) EntriesSet (name string , node * fs.Node ) {
80
77
if dir .Entries == nil {
81
78
dir .Entries = make (map [string ]* fs.Node )
82
79
}
83
80
84
81
dir .Entries [name ] = node
85
82
}
86
83
87
- func (dir * Dir ) EntriesUpdate (name string , attr Attrs ) {
88
- dir .EntriesMutex .Lock ()
89
- defer dir .EntriesMutex .Unlock ()
90
-
84
+ func (dir * DirINode ) EntriesUpdate (name string , attr Attrs ) {
91
85
if dir .Entries == nil {
92
86
dir .Entries = make (map [string ]* fs.Node )
93
87
}
94
88
95
89
if node , ok := dir .Entries [name ]; ok {
96
- if fnode , ok := (* node ).(* File ); ok {
90
+ if fnode , ok := (* node ).(* FileINode ); ok {
97
91
fnode .Attrs = attr
98
- } else if dnode , ok := (* node ).(* Dir ); ok {
92
+ } else if dnode , ok := (* node ).(* DirINode ); ok {
99
93
dnode .Attrs = attr
100
94
}
101
95
}
102
96
}
103
97
104
- func (dir * Dir ) EntriesRemove (name string ) {
105
- dir .EntriesMutex .Lock ()
106
- defer dir .EntriesMutex .Unlock ()
98
+ func (dir * DirINode ) EntriesRemove (name string ) {
107
99
if dir .Entries != nil {
108
100
delete (dir .Entries , name )
109
101
}
110
102
}
111
103
112
104
// Responds on FUSE request to lookup the directory
113
- func (dir * Dir ) Lookup (ctx context.Context , name string ) (fs.Node , error ) {
105
+ func (dir * DirINode ) Lookup (ctx context.Context , name string ) (fs.Node , error ) {
106
+ dir .lockMutex ()
107
+ defer dir .unlockMutex ()
108
+
114
109
if ! dir .FileSystem .IsPathAllowed (dir .AbsolutePathForChild (name )) {
115
110
return nil , fuse .ENOENT
116
111
}
@@ -126,7 +121,7 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) {
126
121
if err != nil {
127
122
return nil , err
128
123
}
129
- zipFile , ok := zipFileNode .(* File )
124
+ zipFile , ok := zipFileNode .(* FileINode )
130
125
if ! ok {
131
126
return nil , fuse .ENOENT
132
127
}
@@ -146,11 +141,14 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) {
146
141
}
147
142
148
143
// Responds on FUSE request to read directory
149
- func (dir * Dir ) ReadDirAll (ctx context.Context ) ([]fuse.Dirent , error ) {
144
+ func (dir * DirINode ) ReadDirAll (ctx context.Context ) ([]fuse.Dirent , error ) {
145
+ dir .lockMutex ()
146
+ defer dir .unlockMutex ()
147
+
150
148
absolutePath := dir .AbsolutePath ()
151
149
loginfo ("Read directory" , Fields {Operation : ReadDir , Path : absolutePath })
152
150
153
- allAttrs , err := dir .FileSystem .HdfsAccessor .ReadDir (absolutePath )
151
+ allAttrs , err := dir .FileSystem .getDFSConnector () .ReadDir (absolutePath )
154
152
if err != nil {
155
153
logwarn ("Failed to list DFS directory" , Fields {Operation : ReadDir , Path : absolutePath , Error : err })
156
154
return nil , err
@@ -184,12 +182,12 @@ func (dir *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
184
182
}
185
183
186
184
// Creates typed node (Dir or File) from the attributes
187
- func (dir * Dir ) NodeFromAttrs (attrs Attrs ) fs.Node {
185
+ func (dir * DirINode ) NodeFromAttrs (attrs Attrs ) fs.Node {
188
186
var node fs.Node
189
187
if (attrs .Mode & os .ModeDir ) == 0 {
190
- node = & File {FileSystem : dir .FileSystem , Parent : dir , Attrs : attrs }
188
+ node = & FileINode {FileSystem : dir .FileSystem , Parent : dir , Attrs : attrs }
191
189
} else {
192
- node = & Dir {FileSystem : dir .FileSystem , Parent : dir , Attrs : attrs }
190
+ node = & DirINode {FileSystem : dir .FileSystem , Parent : dir , Attrs : attrs }
193
191
}
194
192
195
193
if n := dir .EntriesGet (attrs .Name ); n != nil {
@@ -202,9 +200,10 @@ func (dir *Dir) NodeFromAttrs(attrs Attrs) fs.Node {
202
200
}
203
201
204
202
// Performs Stat() query on the backend
205
- func (dir * Dir ) LookupAttrs (name string , attrs * Attrs ) error {
203
+ func (dir * DirINode ) LookupAttrs (name string , attrs * Attrs ) error {
204
+
206
205
var err error
207
- * attrs , err = dir .FileSystem .HdfsAccessor .Stat (path .Join (dir .AbsolutePath (), name ))
206
+ * attrs , err = dir .FileSystem .getDFSConnector () .Stat (path .Join (dir .AbsolutePath (), name ))
208
207
if err != nil {
209
208
// It is a warning as each time new file write tries to stat if the file exists
210
209
loginfo ("Stat failed" , Fields {Operation : Stat , Path : path .Join (dir .AbsolutePath (), name ), Error : err })
@@ -218,72 +217,87 @@ func (dir *Dir) LookupAttrs(name string, attrs *Attrs) error {
218
217
}
219
218
220
219
// Responds on FUSE Mkdir request
221
- func (dir * Dir ) Mkdir (ctx context.Context , req * fuse.MkdirRequest ) (fs.Node , error ) {
222
- err := dir .FileSystem .HdfsAccessor .Mkdir (dir .AbsolutePathForChild (req .Name ), req .Mode )
220
+ func (dir * DirINode ) Mkdir (ctx context.Context , req * fuse.MkdirRequest ) (fs.Node , error ) {
221
+ dir .lockMutex ()
222
+ defer dir .unlockMutex ()
223
+
224
+ err := dir .FileSystem .getDFSConnector ().Mkdir (dir .AbsolutePathForChild (req .Name ), req .Mode )
223
225
if err != nil {
224
226
return nil , err
225
227
}
226
228
return dir .NodeFromAttrs (Attrs {Name : req .Name , Mode : req .Mode | os .ModeDir }), nil
227
229
}
228
230
229
231
// Responds on FUSE Create request
230
- func (dir * Dir ) Create (ctx context.Context , req * fuse.CreateRequest , resp * fuse.CreateResponse ) (fs.Node , fs.Handle , error ) {
231
- loginfo ("Creating a new file" , Fields {Operation : Create , Path : dir .AbsolutePathForChild (req .Name ), Mode : req .Mode , Flags : req .Flags })
232
+ func (dir * DirINode ) Create (ctx context.Context , req * fuse.CreateRequest , resp * fuse.CreateResponse ) (fs.Node , fs.Handle , error ) {
233
+ dir .lockMutex ()
234
+ defer dir .unlockMutex ()
232
235
233
- file := dir .NodeFromAttrs (Attrs {Name : req .Name , Mode : req .Mode }).(* File )
234
- handle , err := NewFileHandle (file , false , req .Flags )
236
+ loginfo ("Creating a new file" , Fields {Operation : Create , Path : dir .AbsolutePathForChild (req .Name ), Mode : req .Mode , Flags : req .Flags })
237
+ file := dir .NodeFromAttrs (Attrs {Name : req .Name , Mode : req .Mode }).(* FileINode )
238
+ handle , err := file .NewFileHandle (false , req .Flags )
235
239
if err != nil {
236
240
logerror ("File creation failed" , Fields {Operation : Create , Path : dir .AbsolutePathForChild (req .Name ), Mode : req .Mode , Flags : req .Flags , Error : err })
241
+ //TODO remove the entry from the cache
237
242
return nil , nil , err
238
243
}
239
244
file .AddHandle (handle )
240
245
return file , handle , nil
241
246
}
242
247
243
248
// Responds on FUSE Remove request
244
- func (dir * Dir ) Remove (ctx context.Context , req * fuse.RemoveRequest ) error {
249
+ func (dir * DirINode ) Remove (ctx context.Context , req * fuse.RemoveRequest ) error {
250
+ dir .lockMutex ()
251
+ defer dir .unlockMutex ()
252
+
245
253
path := dir .AbsolutePathForChild (req .Name )
246
254
loginfo ("Removing path" , Fields {Operation : Remove , Path : path })
247
- err := dir .FileSystem .HdfsAccessor .Remove (path )
255
+ err := dir .FileSystem .getDFSConnector () .Remove (path )
248
256
if err == nil {
249
257
dir .EntriesRemove (req .Name )
250
258
} else {
251
- logerror ("Failed to remove path" , Fields {Operation : Remove , Path : path , Error : err })
259
+ logwarn ("Failed to remove path" , Fields {Operation : Remove , Path : path , Error : err })
252
260
}
253
261
return err
254
262
}
255
263
256
264
// Responds on FUSE Rename request
257
- func (dir * Dir ) Rename (ctx context.Context , req * fuse.RenameRequest , newDir fs.Node ) error {
265
+ func (dir * DirINode ) Rename (ctx context.Context , req * fuse.RenameRequest , newDir fs.Node ) error {
266
+ dir .lockMutex ()
267
+ defer dir .unlockMutex ()
268
+
258
269
oldPath := dir .AbsolutePathForChild (req .OldName )
259
- newPath := newDir .(* Dir ).AbsolutePathForChild (req .NewName )
270
+ newPath := newDir .(* DirINode ).AbsolutePathForChild (req .NewName )
260
271
loginfo ("Renaming to " + newPath , Fields {Operation : Rename , Path : oldPath })
261
- err := dir .FileSystem .HdfsAccessor .Rename (oldPath , newPath )
272
+ err := dir .FileSystem .getDFSConnector () .Rename (oldPath , newPath )
262
273
if err == nil {
263
274
// Upon successful rename, updating in-memory representation of the file entry
264
275
if node := dir .EntriesGet (req .OldName ); node != nil {
265
- if fnode , ok := (* node ).(* File ); ok {
276
+ if fnode , ok := (* node ).(* FileINode ); ok {
266
277
fnode .Attrs .Name = req .NewName
267
- } else if dnode , ok := (* node ).(* Dir ); ok {
278
+ } else if dnode , ok := (* node ).(* DirINode ); ok {
268
279
dnode .Attrs .Name = req .NewName
269
280
}
270
281
dir .EntriesRemove (req .OldName )
271
- newDir .(* Dir ).EntriesSet (req .NewName , node )
282
+ newDir .(* DirINode ).EntriesSet (req .NewName , node )
272
283
}
273
284
}
274
285
return err
275
286
}
276
287
277
288
// Responds on FUSE Chmod request
278
- func (dir * Dir ) Setattr (ctx context.Context , req * fuse.SetattrRequest , resp * fuse.SetattrResponse ) error {
289
+ func (dir * DirINode ) Setattr (ctx context.Context , req * fuse.SetattrRequest , resp * fuse.SetattrResponse ) error {
290
+ dir .lockMutex ()
291
+ defer dir .unlockMutex ()
292
+
279
293
// Get the filepath, so chmod in hdfs can work
280
294
path := dir .AbsolutePath ()
281
295
var err error
282
296
283
297
if req .Valid .Mode () {
284
298
loginfo ("Setting attributes" , Fields {Operation : Chmod , Path : path , Mode : req .Mode })
285
299
(func () {
286
- err = dir .FileSystem .HdfsAccessor .Chmod (path , req .Mode )
300
+ err = dir .FileSystem .getDFSConnector () .Chmod (path , req .Mode )
287
301
if err != nil {
288
302
return
289
303
}
@@ -310,7 +324,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
310
324
311
325
loginfo ("Setting attributes" , Fields {Operation : Chown , Path : path , User : u , UID : owner , GID : group })
312
326
(func () {
313
- err = dir .FileSystem .HdfsAccessor .Chown (path , owner , group )
327
+ err = dir .FileSystem .getDFSConnector () .Chown (path , owner , group )
314
328
if err != nil {
315
329
return
316
330
}
@@ -326,3 +340,13 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
326
340
327
341
return err
328
342
}
343
+
344
+ var dirLockTime time.Time = time.Time {}
345
+
346
+ func (dir * DirINode ) lockMutex () {
347
+ dir .mutex .Lock ()
348
+ }
349
+
350
+ func (dir * DirINode ) unlockMutex () {
351
+ dir .mutex .Unlock ()
352
+ }
0 commit comments