Skip to content

Commit 8bf5054

Browse files
authored
Merge pull request #43 from smkniazi/GH-41
Gh 41
2 parents 3da3372 + 2a8522c commit 8bf5054

11 files changed

+213
-132
lines changed

Attrs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type FsInfo struct {
3131
}
3232

3333
// Converts Attrs datastructure into FUSE represnetation
34-
func (attrs *Attrs) Attr(a *fuse.Attr) error {
34+
func (attrs *Attrs) ConvertAttrToFuse(a *fuse.Attr) error {
3535
a.Inode = attrs.Inode
3636
a.Mode = attrs.Mode
3737
if (a.Mode & os.ModeDir) == 0 {

Dir.go

+32-70
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@ package main
55
import (
66
"fmt"
77
"os"
8-
"os/user"
98
"path"
9+
"path/filepath"
1010
"strings"
1111
"sync"
1212
"time"
1313

1414
"bazil.org/fuse"
1515
"bazil.org/fuse/fs"
1616
"golang.org/x/net/context"
17-
"logicalclocks.com/hopsfs-mount/ugcache"
1817
)
1918

2019
// Encapsulates state and operations for directory node on the HDFS file system
@@ -49,7 +48,7 @@ func (dir *DirINode) AbsolutePathForChild(name string) string {
4948
if path != "/" {
5049
path = path + "/"
5150
}
52-
return path + name
51+
return path + filepath.Base(name)
5352
}
5453

5554
// Responds on FUSE request to get directory attributes
@@ -63,7 +62,7 @@ func (dir *DirINode) Attr(ctx context.Context, a *fuse.Attr) error {
6362
}
6463

6564
}
66-
return dir.Attrs.Attr(a)
65+
return dir.Attrs.ConvertAttrToFuse(a)
6766
}
6867

6968
func (dir *DirINode) EntriesGet(name string) *fs.Node {
@@ -229,15 +228,16 @@ func (dir *DirINode) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node
229228
}
230229
logdebug("mkdir successful", Fields{Operation: Mkdir, Path: path.Join(dir.AbsolutePath(), req.Name)})
231230

232-
err = dir.changeOwnership(dir.AbsolutePathForChild(req.Name), req.Uid, req.Gid)
231+
err = ChownOp(&dir.Attrs, dir.FileSystem, dir.AbsolutePathForChild(req.Name), req.Uid, req.Gid)
233232
if err != nil {
234-
logwarn("Unable to change ownership of new dir", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name), UID: req.Uid, GID: req.Gid})
233+
logwarn("Unable to change ownership of new dir", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name),
234+
UID: req.Uid, GID: req.Gid, Error: err})
235235
//unable to change the ownership of the directory. so delete it as the operation as a whole failed
236236
dir.FileSystem.getDFSConnector().Remove(dir.AbsolutePathForChild(req.Name))
237237
return nil, err
238238
}
239239

240-
return dir.NodeFromAttrs(Attrs{Name: req.Name, Mode: req.Mode | os.ModeDir}), nil
240+
return dir.NodeFromAttrs(Attrs{Name: req.Name, Mode: req.Mode | os.ModeDir, Uid: req.Uid, Gid: req.Gid}), nil
241241
}
242242

243243
// Responds on FUSE Create request
@@ -253,36 +253,24 @@ func (dir *DirINode) Create(ctx context.Context, req *fuse.CreateRequest, resp *
253253
//TODO remove the entry from the cache
254254
return nil, nil, err
255255
}
256-
file.AddHandle(handle)
257256

258-
err = dir.changeOwnership(dir.AbsolutePathForChild(req.Name), req.Uid, req.Gid)
257+
file.AddHandle(handle)
258+
err = ChownOp(&dir.Attrs, dir.FileSystem, dir.AbsolutePathForChild(req.Name), req.Uid, req.Gid)
259259
if err != nil {
260-
logwarn("Unable to change ownership of new file", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name), UID: req.Uid, GID: req.Gid})
260+
logwarn("Unable to change ownership of new file", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name),
261+
UID: req.Uid, GID: req.Gid, Error: err})
261262
//unable to change the ownership of the file. so delete it as the operation as a whole failed
262263
dir.FileSystem.getDFSConnector().Remove(dir.AbsolutePathForChild(req.Name))
263264
return nil, nil, err
264265
}
265266

266-
return file, handle, nil
267-
}
268-
269-
func (dir *DirINode) changeOwnership(path string, uid uint32, gid uint32) error {
270-
if hadoopUserID != uid { // the file is created by an other user, so change the ownership information
271-
user := ugcache.LookupUserName(uid)
272-
if user == "" {
273-
logwarn("Unable to find user information", Fields{Operation: Chown, Path: path, UID: uid, GID: gid})
274-
}
275-
group := ugcache.LookupGroupName(gid)
276-
if group == "" {
277-
logwarn("Unable to find group information", Fields{Operation: Chown, Path: path, UID: uid, GID: gid})
278-
}
279-
err := dir.FileSystem.getDFSConnector().Chown(path, user, group)
280-
if err != nil {
281-
return err
282-
}
283-
loginfo("Updated ownership", Fields{Operation: Create, Path: path, User: user, Group: group})
267+
//update the attributes of the file now
268+
err = dir.LookupAttrs(file.Attrs.Name, &file.Attrs)
269+
if err != nil {
270+
return nil, nil, err
284271
}
285-
return nil
272+
273+
return file, handle, nil
286274
}
287275

288276
// Responds on FUSE Remove request
@@ -332,59 +320,33 @@ func (dir *DirINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp
332320
dir.lockMutex()
333321
defer dir.unlockMutex()
334322

335-
// Get the filepath, so chmod in hdfs can work
323+
if req.Valid.Size() {
324+
return fmt.Errorf("unsupported operation. Can not set size of a directory")
325+
}
326+
336327
path := dir.AbsolutePath()
337-
var err error
338328

339329
if req.Valid.Mode() {
340-
loginfo("Setting attributes", Fields{Operation: Chmod, Path: path, Mode: req.Mode})
341-
(func() {
342-
err = dir.FileSystem.getDFSConnector().Chmod(path, req.Mode)
343-
if err != nil {
344-
return
345-
}
346-
})()
347-
348-
if err != nil {
349-
logerror("Failed to set attributes", Fields{Operation: Chmod, Path: path, Mode: req.Mode, Error: err})
350-
} else {
351-
dir.Attrs.Mode = req.Mode
330+
if err := ChmodOp(&dir.Attrs, dir.FileSystem, path, req, resp); err != nil {
331+
logwarn("Setattr (chmod) failed. ", Fields{Operation: Chmod, Path: path, Mode: req.Mode})
332+
return err
352333
}
353334
}
354335

355-
if req.Valid.Uid() {
356-
u, err := user.LookupId(fmt.Sprint(req.Uid))
357-
owner := fmt.Sprint(req.Uid)
358-
group := fmt.Sprint(req.Gid)
359-
if err != nil {
360-
logerror(fmt.Sprintf("Chown: username for uid %d not found, use uid/gid instead", req.Uid),
361-
Fields{Operation: Chown, Path: path, User: u, UID: owner, GID: group, Error: err})
362-
} else {
363-
owner = u.Username
364-
group = owner // hardcoded the group same as owner until LookupGroupId available
336+
if req.Valid.Uid() || req.Valid.Gid() {
337+
if err := SetAttrChownOp(&dir.Attrs, dir.FileSystem, path, req, resp); err != nil {
338+
logwarn("Setattr (chown/chgrp )failed", Fields{Operation: Chmod, Path: path, UID: req.Uid, GID: req.Gid})
339+
return err
365340
}
341+
}
366342

367-
loginfo("Setting attributes", Fields{Operation: Chown, Path: path, User: u, UID: owner, GID: group})
368-
(func() {
369-
err = dir.FileSystem.getDFSConnector().Chown(path, owner, group)
370-
if err != nil {
371-
return
372-
}
373-
})()
374-
375-
if err != nil {
376-
logerror("Failed to set attributes", Fields{Operation: Chown, Path: path, User: u, UID: owner, GID: group, Error: err})
377-
} else {
378-
dir.Attrs.Uid = req.Uid
379-
dir.Attrs.Gid = req.Gid
380-
}
343+
if err := UpdateTS(&dir.Attrs, dir.FileSystem, path, req, resp); err != nil {
344+
return err
381345
}
382346

383-
return err
347+
return nil
384348
}
385349

386-
var dirLockTime time.Time = time.Time{}
387-
388350
func (dir *DirINode) lockMutex() {
389351
dir.mutex.Lock()
390352
}

Dir_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -131,32 +131,36 @@ func TestLookupWithFiltering(t *testing.T) {
131131

132132
// Testing Mkdir
133133
func TestMkdir(t *testing.T) {
134+
dir := "/foo"
134135
mockCtrl := gomock.NewController(t)
135136
mockClock := &MockClock{}
136137
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
138+
hdfsAccessor.EXPECT().Chown(dir, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
137139
fs, _ := NewFileSystem([]HdfsAccessor{hdfsAccessor}, "/", []string{"foo", "bar"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
138140
root, _ := fs.Root()
139-
hdfsAccessor.EXPECT().Mkdir("/foo", os.FileMode(0757)|os.ModeDir).Return(nil)
141+
hdfsAccessor.EXPECT().Mkdir(dir, os.FileMode(0757)|os.ModeDir).Return(nil)
140142
node, err := root.(*DirINode).Mkdir(nil, &fuse.MkdirRequest{Name: "foo", Mode: os.FileMode(0757) | os.ModeDir})
141143
assert.Nil(t, err)
142144
assert.Equal(t, "foo", node.(*DirINode).Attrs.Name)
143145
}
144146

145147
// Testing Chmod and Chown
146148
func TestSetattr(t *testing.T) {
149+
dir := "/foo"
147150
mockCtrl := gomock.NewController(t)
148151
mockClock := &MockClock{}
149152
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
153+
hdfsAccessor.EXPECT().Chown(dir, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
150154
fs, _ := NewFileSystem([]HdfsAccessor{hdfsAccessor}, "/", []string{"foo", "bar"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
151155
root, _ := fs.Root()
152-
hdfsAccessor.EXPECT().Mkdir("/foo", os.FileMode(0757)|os.ModeDir).Return(nil)
156+
hdfsAccessor.EXPECT().Mkdir(dir, os.FileMode(0757)|os.ModeDir).Return(nil)
153157
node, _ := root.(*DirINode).Mkdir(nil, &fuse.MkdirRequest{Name: "foo", Mode: os.FileMode(0757) | os.ModeDir})
154-
hdfsAccessor.EXPECT().Chmod("/foo", os.FileMode(0777)).Return(nil)
158+
hdfsAccessor.EXPECT().Chmod(dir, os.FileMode(0777)).Return(nil).AnyTimes()
155159
err := node.(*DirINode).Setattr(nil, &fuse.SetattrRequest{Mode: os.FileMode(0777), Valid: fuse.SetattrMode}, &fuse.SetattrResponse{})
156160
assert.Nil(t, err)
157161
assert.Equal(t, os.FileMode(0777), node.(*DirINode).Attrs.Mode)
158162

159-
hdfsAccessor.EXPECT().Chown("/foo", "root", "root").Return(nil)
163+
hdfsAccessor.EXPECT().Chown(dir, "root", gomock.Any()).Return(nil).AnyTimes()
160164
err = node.(*DirINode).Setattr(nil, &fuse.SetattrRequest{Uid: 0, Valid: fuse.SetattrUid}, &fuse.SetattrResponse{})
161165
assert.Nil(t, err)
162166
assert.Equal(t, uint32(0), node.(*DirINode).Attrs.Uid)

File.go

+36-51
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"io/ioutil"
99
"math/rand"
1010
"os"
11-
"os/user"
1211
"path"
1312
"sync"
1413
"syscall"
@@ -50,13 +49,29 @@ func (file *FileINode) Attr(ctx context.Context, a *fuse.Attr) error {
5049
file.lockFile()
5150
defer file.unlockFile()
5251

53-
if file.FileSystem.Clock.Now().After(file.Attrs.Expires) {
54-
err := file.Parent.LookupAttrs(file.Attrs.Name, &file.Attrs)
52+
// if the file is open for writing then update the file length and mtime
53+
// from the straging file.
54+
// Otherwise read the stats from the cache if it is valid.
55+
56+
if lrwfp, ok := file.handle.(*LocalRWFileProxy); ok {
57+
fileInfo, err := lrwfp.localFile.Stat()
5558
if err != nil {
59+
logwarn("stat failed on staging file", Fields{Operation: Stat, Path: file.AbsolutePath(), Error: err})
5660
return err
5761
}
62+
// update the local cache
63+
file.Attrs.Size = uint64(fileInfo.Size())
64+
file.Attrs.Mtime = fileInfo.ModTime()
65+
} else {
66+
if file.FileSystem.Clock.Now().After(file.Attrs.Expires) {
67+
err := file.Parent.LookupAttrs(file.Attrs.Name, &file.Attrs)
68+
if err != nil {
69+
return err
70+
}
71+
}
5872
}
59-
return file.Attrs.Attr(a)
73+
return file.Attrs.ConvertAttrToFuse(a)
74+
6075
}
6176

6277
// Responds to the FUSE file open request (creates new file handle)
@@ -111,7 +126,6 @@ func (file *FileINode) RemoveHandle(handle *FileHandle) {
111126
//close the staging file if it is the last handle
112127
if len(file.activeHandles) == 0 {
113128
file.closeStaging()
114-
logdebug("Staging file is closed.", file.logInfo(Fields{Operation: Close}))
115129
} else {
116130
logtrace("Staging file is not closed.", file.logInfo(Fields{Operation: Close}))
117131
}
@@ -156,67 +170,37 @@ func (file *FileINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, re
156170
defer file.unlockFile()
157171

158172
if req.Valid.Size() {
159-
var retErr error
173+
var err error = nil
160174
for _, handle := range file.activeHandles {
161-
if handle.dataChanged() { // to only write enabled handles
162-
err := handle.Truncate(int64(req.Size))
163-
if err != nil {
164-
retErr = err
165-
}
175+
err := handle.Truncate(int64(req.Size))
176+
if err != nil {
177+
err = err
166178
}
179+
resp.Attr.Size = req.Size
180+
file.Attrs.Size = req.Size
167181
}
168-
return retErr
182+
return err
169183
}
170184

171-
// Get the filepath, so chmod in hdfs can work
172185
path := file.AbsolutePath()
173-
var err error
174186

175187
if req.Valid.Mode() {
176-
loginfo("Setting attributes", Fields{Operation: Chmod, Path: path, Mode: req.Mode})
177-
(func() {
178-
err = file.FileSystem.getDFSConnector().Chmod(path, req.Mode)
179-
if err != nil {
180-
return
181-
}
182-
})()
183-
184-
if err != nil {
185-
logerror("Failed to set attributes", Fields{Operation: Chmod, Path: path, Mode: req.Mode, Error: err})
186-
} else {
187-
file.Attrs.Mode = req.Mode
188+
if err := ChmodOp(&file.Attrs, file.FileSystem, path, req, resp); err != nil {
189+
return err
188190
}
189191
}
190192

191-
if req.Valid.Uid() {
192-
u, err := user.LookupId(fmt.Sprint(req.Uid))
193-
owner := fmt.Sprint(req.Uid)
194-
group := fmt.Sprint(req.Gid)
195-
if err != nil {
196-
logerror(fmt.Sprintf("Chown: username for uid %d not found, use uid/gid instead", req.Uid),
197-
Fields{Operation: Chown, Path: path, User: u, UID: owner, GID: group, Error: err})
198-
} else {
199-
owner = u.Username
200-
group = owner // hardcoded the group same as owner
193+
if req.Valid.Uid() || req.Valid.Gid() {
194+
if err := SetAttrChownOp(&file.Attrs, file.FileSystem, path, req, resp); err != nil {
195+
return err
201196
}
197+
}
202198

203-
loginfo("Setting attributes", Fields{Operation: Chown, Path: path, User: u, UID: owner, GID: group})
204-
(func() {
205-
err = file.FileSystem.getDFSConnector().Chown(path, fmt.Sprint(req.Uid), fmt.Sprint(req.Gid))
206-
if err != nil {
207-
return
208-
}
209-
})()
210-
211-
if err != nil {
212-
logerror("Failed to set attributes", Fields{Operation: Chown, Path: path, User: u, UID: owner, GID: group, Error: err})
213-
} else {
214-
file.Attrs.Uid = req.Uid
215-
file.Attrs.Gid = req.Gid
216-
}
199+
if err := UpdateTS(&file.Attrs, file.FileSystem, path, req, resp); err != nil {
200+
return err
217201
}
218202

219-
return err
203+
return nil
220204
}
221205

222206
func (file *FileINode) countActiveHandles() int {
@@ -326,6 +310,7 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F
326310
loginfo("Opened file, RO handle", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
327311
}
328312
}
313+
329314
return fh, nil
330315
}
331316

FileHandleWriter_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func TestReadWriteFile(t *testing.T) {
2727
hdfswriter.EXPECT().Close().Return(nil).AnyTimes()
2828
hdfsAccessor.EXPECT().CreateFile(fileName, os.FileMode(0757), gomock.Any()).Return(hdfswriter, nil).AnyTimes()
2929
hdfsAccessor.EXPECT().Stat(fileName).Return(Attrs{Name: fileName}, nil).AnyTimes()
30+
hdfsAccessor.EXPECT().Chown(fileName, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
3031
hdfswriter.EXPECT().Close().Return(nil).AnyTimes()
3132

3233
root, _ := fs.Root()
@@ -64,14 +65,14 @@ func TestFaultTolerantWriteFile(t *testing.T) {
6465
hdfswriter := NewMockHdfsWriter(mockCtrl)
6566

6667
hdfswriter.EXPECT().Close().Return(nil).AnyTimes()
67-
hdfsAccessor.EXPECT().Stat(fileName).Return(Attrs{Name: fileName}, nil).AnyTimes()
68+
hdfsAccessor.EXPECT().Stat(fileName).Return(Attrs{Name: fileName, Mode: os.FileMode(0757)}, nil).AnyTimes()
69+
hdfsAccessor.EXPECT().Chown(fileName, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
6870
hdfswriter.EXPECT().Close().Return(nil).AnyTimes()
6971

7072
hdfsAccessor.EXPECT().StatFs().Return(FsInfo{capacity: uint64(100), used: uint64(20), remaining: uint64(80)}, nil).AnyTimes()
7173
hdfsAccessor.EXPECT().Remove("/testWriteFile_1").Return(nil).AnyTimes()
7274
hdfsAccessor.EXPECT().CreateFile(fileName, os.FileMode(0757), gomock.Any()).DoAndReturn(func(path string,
7375
mode os.FileMode, overwrite bool) (HdfsWriter, error) {
74-
// fmt.Println("-.....> ")
7576
return hdfswriter, nil
7677
}).AnyTimes()
7778

0 commit comments

Comments
 (0)