Skip to content

Commit b63e211

Browse files
committed
Tests passing but the performance degraded further
1 parent d857a9f commit b63e211

8 files changed

+189
-61
lines changed

File.go

+74-16
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ func (file *FileINode) AbsolutePath() string {
4747

4848
// Responds to the FUSE file attribute request
4949
func (file *FileINode) Attr(ctx context.Context, a *fuse.Attr) error {
50+
file.fileMutex.Lock()
51+
defer file.fileMutex.Unlock()
5052
if file.FileSystem.Clock.Now().After(file.Attrs.Expires) {
5153
err := file.Parent.LookupAttrs(file.Attrs.Name, &file.Attrs)
5254
if err != nil {
@@ -100,6 +102,8 @@ func (file *FileINode) RemoveHandle(handle *FileHandle) {
100102
// Responds to the FUSE Fsync request
101103
func (file *FileINode) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
102104
loginfo(fmt.Sprintf("Dispatching fsync request to all open handles: %d", len(file.activeHandles)), Fields{Operation: Fsync})
105+
file.fileMutex.Lock()
106+
defer file.fileMutex.Unlock()
103107
var retErr error
104108
for _, handle := range file.activeHandles {
105109
err := handle.Fsync(ctx, req)
@@ -123,7 +127,7 @@ func (file *FileINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, re
123127
if req.Valid.Size() {
124128
var retErr error
125129
for _, handle := range file.activeHandles {
126-
if handle.isWriteable() { // to only write enabled handles
130+
if handle.dataChanged() { // to only write enabled handles
127131
err := handle.Truncate(int64(req.Size))
128132
if err != nil {
129133
retErr = err
@@ -188,9 +192,9 @@ func (file *FileINode) countActiveHandles() int {
188192
return len(file.activeHandles)
189193
}
190194

191-
func (file *FileINode) createStagingFile(operation string, existsInDFS bool) error {
195+
func (file *FileINode) createStagingFile(operation string, existsInDFS bool) (*os.File, error) {
192196
if file.handle != nil {
193-
return nil // there is already an active handle.
197+
return nil, nil // there is already an active handle.
194198
}
195199

196200
//create staging file
@@ -200,7 +204,7 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) err
200204
w, err := hdfsAccessor.CreateFile(absPath, file.Attrs.Mode, false)
201205
if err != nil {
202206
logerror("Failed to create file in DFS", file.logInfo(Fields{Operation: operation, Error: err}))
203-
return err
207+
return nil, err
204208
}
205209
loginfo("Created an empty file in DFS", file.logInfo(Fields{Operation: operation}))
206210
w.Close()
@@ -209,25 +213,24 @@ func (file *FileINode) createStagingFile(operation string, existsInDFS bool) err
209213
_, err := hdfsAccessor.Stat(absPath)
210214
if err != nil {
211215
logerror("Failed to stat file in DFS", file.logInfo(Fields{Operation: operation, Error: err}))
212-
return syscall.ENOENT
216+
return nil, syscall.ENOENT
213217
}
214218
}
215219

216220
stagingFile, err := ioutil.TempFile(stagingDir, "stage")
217221
if err != nil {
218222
logerror("Failed to create staging file", file.logInfo(Fields{Operation: operation, Error: err}))
219-
return err
223+
return nil, err
220224
}
221225
os.Remove(stagingFile.Name())
222226
loginfo("Created staging file", file.logInfo(Fields{Operation: operation, TmpFile: stagingFile.Name()}))
223227

224228
if existsInDFS {
225229
if err := file.downloadToStaging(stagingFile, operation); err != nil {
226-
return err
230+
return nil, err
227231
}
228232
}
229-
file.handle = &LocalFileProxy{localFile: stagingFile, file: file}
230-
return nil
233+
return stagingFile, nil
231234
}
232235

233236
func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string) error {
@@ -256,22 +259,77 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F
256259
file.lockFileHandle()
257260
defer file.unLockFileHandle()
258261

262+
fh := &FileHandle{File: file, fileFlags: flags, fhID: int64(rand.Uint64())}
259263
operation := Create
260264
if existsInDFS {
261265
operation = Open
262266
}
263267

264-
fh := &FileHandle{File: file, fileFlags: flags, fhID: int64(rand.Uint64())}
265-
if err := file.checkDiskSpace(); err != nil {
266-
return nil, err
268+
if operation == Create {
269+
// there must be no existing file handles for create operation
270+
if file.handle != nil {
271+
logpanic("Unexpected file state during creation", file.logInfo(Fields{Flags: flags}))
272+
}
273+
if err := file.checkDiskSpace(); err != nil {
274+
return nil, err
275+
}
276+
stagingFile, err := file.createStagingFile(operation, existsInDFS)
277+
if err != nil {
278+
return nil, err
279+
}
280+
fh.File.handle = &LocalRWFileProxy{localFile: stagingFile, file: file}
281+
loginfo("Opened file, RW handle", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
282+
} else {
283+
if file.handle != nil {
284+
fh.File.handle = file.handle
285+
loginfo("Opened file, Returning existing handle", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
286+
} else {
287+
// we alway open the file in RO mode. when the client writes to the file
288+
// then we upgrade the handle. However, if the file is already opened in
289+
// in RW state then we use the existing RW handle
290+
// if file.handle
291+
reader, _ := file.FileSystem.HdfsAccessor.OpenRead(file.AbsolutePath())
292+
fh.File.handle = &RemoteROFileProxy{hdfsReader: reader, file: file}
293+
loginfo("Opened file, RO handle", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
294+
}
267295
}
296+
return fh, nil
297+
}
268298

269-
if err := file.createStagingFile(operation, existsInDFS); err != nil {
270-
return nil, err
299+
// changes RO file handle to RW
300+
func (file *FileINode) upgradeHandleForWriting() error {
301+
file.lockFileHandle()
302+
defer file.unLockFileHandle()
303+
304+
var upgrade = false
305+
if _, ok := file.handle.(*LocalRWFileProxy); ok {
306+
upgrade = false
307+
} else if _, ok := file.handle.(*RemoteROFileProxy); ok {
308+
upgrade = true
309+
} else {
310+
logpanic("Unrecognized remote file proxy", nil)
271311
}
272312

273-
loginfo("Opened file", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
274-
return fh, nil
313+
if !upgrade {
314+
return nil
315+
} else {
316+
remoteROFileProxy, _ := file.handle.(*RemoteROFileProxy)
317+
remoteROFileProxy.hdfsReader.Close() // close this read only handle
318+
file.handle = nil
319+
320+
if err := file.checkDiskSpace(); err != nil {
321+
return err
322+
}
323+
324+
stagingFile, err := file.createStagingFile("Open", true)
325+
if err != nil {
326+
return err
327+
}
328+
329+
file.handle = &LocalRWFileProxy{localFile: stagingFile, file: file}
330+
loginfo("Open handle upgrade to support RW ", file.logInfo(Fields{Operation: "Open"}))
331+
return nil
332+
}
275333
}
276334

277335
func (file *FileINode) checkDiskSpace() error {

FileHandleWriter_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestFaultTolerantWriteFile(t *testing.T) {
9191
assert.Equal(t, writeHandle.totalBytesWritten, int64(11))
9292

9393
binaryData := make([]byte, 65536)
94-
writeHandle.File.handle.Seek(0, 0)
94+
writeHandle.File.handle.SeekToStart()
9595
nr, _ := writeHandle.File.handle.Read(binaryData)
9696
binaryData = binaryData[:nr]
9797

FileProxy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ type FileProxy interface {
44
Truncate(size int64) error
55
WriteAt(b []byte, off int64) (n int, err error)
66
ReadAt(b []byte, off int64) (n int, err error)
7-
Seek(offset int64, whence int) (ret int64, err error)
7+
SeekToStart() (err error)
88
Read(b []byte) (n int, err error)
99
Close() error
1010
Sync() error

FileSystemOperations_test.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,33 @@ func TestSimple(t *testing.T) {
2020
withMount(t, "/", func(mountPoint string, hdfsAccessor HdfsAccessor) {
2121
//create a file, make sure that use and group information is correct
2222
testFile := filepath.Join(mountPoint, "somefile")
23+
os.Remove(testFile)
24+
2325
loginfo(fmt.Sprintf("New file: %s", testFile), nil)
2426
createFile(t, testFile, "some data")
2527
fi, _ := os.Stat(testFile)
2628
fstat := fi.Sys().(*syscall.Stat_t)
2729
grupInfo, _ := user.LookupGroupId(fmt.Sprintf("%d", fstat.Gid))
2830
userInfo, _ := user.LookupId(fmt.Sprintf("%d", fstat.Uid))
29-
loginfo(fmt.Sprintf("New file: %s, User %s, Gropu %s", testFile, userInfo.Name, grupInfo.Name), nil)
31+
loginfo(fmt.Sprintf("---> New file: %s, User %s, Gropu %s", testFile, userInfo.Name, grupInfo.Name), nil)
32+
33+
loginfo("---> Reopening the file to write some more data", nil)
34+
// append some more data
35+
c, err := os.OpenFile(testFile, os.O_APPEND, 0600)
36+
if err != nil {
37+
t.Errorf("Reopening the file failed. File: %s. Error: %v", testFile, err)
38+
}
39+
c.WriteString("some more data")
40+
c.Close()
41+
42+
loginfo("---> Reopening the file to read all the data", nil)
43+
// read all the data again
44+
c, _ = os.OpenFile(testFile, os.O_RDWR, 0600)
45+
buffer := make([]byte, 1024)
46+
c.Read(buffer)
47+
c.Close()
48+
logdebug(fmt.Sprintf("Data Read. %s", buffer), nil)
49+
3050
os.Remove(testFile)
3151
})
3252
}

HopsFileHandle.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ var _ fs.HandleWriter = (*FileHandle)(nil)
2929
var _ fs.NodeFsyncer = (*FileHandle)(nil)
3030
var _ fs.HandleFlusher = (*FileHandle)(nil)
3131

32-
func (fh *FileHandle) isWriteable() bool {
33-
if fh.fileFlags.IsWriteOnly() || fh.fileFlags.IsReadWrite() {
32+
func (fh *FileHandle) dataChanged() bool {
33+
if fh.totalBytesWritten > 0 {
3434
return true
3535
} else {
3636
return false
@@ -66,14 +66,13 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
6666
if err != nil {
6767
if err == io.EOF {
6868
// EOF isn't a error, reporting successful read to FUSE
69-
logdebug("Finished reading from staging file. EOF", fh.logInfo(Fields{Operation: Read, Bytes: nr}))
69+
logdebug("Completed reading", fh.logInfo(Fields{Operation: Read, Error: err, Bytes: nr}))
7070
return nil
7171
} else {
72-
logerror("Failed to read from staging file", fh.logInfo(Fields{Operation: Read, Error: err, Bytes: nr}))
72+
logerror("Failed to read", fh.logInfo(Fields{Operation: Read, Error: err, Bytes: nr}))
7373
return err
7474
}
7575
}
76-
logdebug("Read from staging file", fh.logInfo(Fields{Operation: Read, Bytes: nr, ReqOffset: req.Offset}))
7776
return err
7877
}
7978

@@ -82,6 +81,9 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
8281
fh.Mutex.Lock()
8382
defer fh.Mutex.Unlock()
8483

84+
// as an optimization the file is initially opened in readonly mode
85+
fh.File.upgradeHandleForWriting()
86+
8587
nw, err := fh.File.handle.WriteAt(req.Data, req.Offset)
8688
resp.Size = nw
8789
fh.totalBytesWritten += int64(nw)
@@ -123,9 +125,9 @@ func (fh *FileHandle) FlushAttempt(operation string) error {
123125
}
124126

125127
//open the file for reading and upload to DFS
126-
offset, err := fh.File.handle.Seek(0, 0)
127-
if err != nil || offset != 0 {
128-
logerror("Unable to seek to the begenning of the temp file", fh.logInfo(Fields{Operation: operation, Offset: offset, Error: err}))
128+
err = fh.File.handle.SeekToStart()
129+
if err != nil {
130+
logerror("Unable to seek to the begenning of the temp file", fh.logInfo(Fields{Operation: operation, Error: err}))
129131
return err
130132
}
131133

@@ -164,7 +166,7 @@ func (fh *FileHandle) FlushAttempt(operation string) error {
164166
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
165167
fh.Mutex.Lock()
166168
defer fh.Mutex.Unlock()
167-
if fh.isWriteable() {
169+
if fh.dataChanged() {
168170
loginfo("Flush file", fh.logInfo(Fields{Operation: Flush}))
169171
return fh.copyToDFS(Flush)
170172
} else {
@@ -176,7 +178,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
176178
func (fh *FileHandle) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
177179
fh.Mutex.Lock()
178180
defer fh.Mutex.Unlock()
179-
if fh.isWriteable() {
181+
if fh.dataChanged() {
180182
loginfo("Fsync file", fh.logInfo(Fields{Operation: Fsync}))
181183
return fh.copyToDFS(Fsync)
182184
} else {

LocalFileProxy.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,53 @@ package main
22

33
import "os"
44

5-
type LocalFileProxy struct {
5+
type LocalRWFileProxy struct {
66
localFile *os.File // handle to the temp file in staging dir
77
file *FileINode
88
}
99

10-
var _ FileProxy = (*LocalFileProxy)(nil)
10+
var _ FileProxy = (*LocalRWFileProxy)(nil)
1111

12-
func (p *LocalFileProxy) Truncate(size int64) error {
12+
func (p *LocalRWFileProxy) Truncate(size int64) error {
1313
p.file.lockFileHandle()
1414
defer p.file.unLockFileHandle()
1515
return p.localFile.Truncate(size)
1616
}
1717

18-
func (p *LocalFileProxy) WriteAt(b []byte, off int64) (n int, err error) {
18+
func (p *LocalRWFileProxy) WriteAt(b []byte, off int64) (n int, err error) {
1919
p.file.lockFileHandle()
2020
defer p.file.unLockFileHandle()
2121
return p.localFile.WriteAt(b, off)
2222
}
2323

24-
func (p *LocalFileProxy) ReadAt(b []byte, off int64) (n int, err error) {
24+
func (p *LocalRWFileProxy) ReadAt(b []byte, off int64) (n int, err error) {
2525
p.file.lockFileHandle()
2626
defer p.file.unLockFileHandle()
27-
return p.localFile.ReadAt(b, off)
27+
n, err = p.localFile.ReadAt(b, off)
28+
logdebug("LocalFileProxy ReadAt", p.file.logInfo(Fields{Operation: Read, Bytes: n, Error: err, Offset: off}))
29+
return
2830
}
2931

30-
func (p *LocalFileProxy) Seek(offset int64, whence int) (ret int64, err error) {
32+
func (p *LocalRWFileProxy) SeekToStart() (err error) {
3133
p.file.lockFileHandle()
3234
defer p.file.unLockFileHandle()
33-
return p.localFile.Seek(offset, whence)
35+
_, err = p.localFile.Seek(0, 0)
36+
return
3437
}
3538

36-
func (p *LocalFileProxy) Read(b []byte) (n int, err error) {
39+
func (p *LocalRWFileProxy) Read(b []byte) (n int, err error) {
3740
p.file.lockFileHandle()
3841
defer p.file.unLockFileHandle()
3942
return p.localFile.Read(b)
4043
}
4144

42-
func (p *LocalFileProxy) Close() error {
45+
func (p *LocalRWFileProxy) Close() error {
4346
p.file.lockFileHandle()
4447
defer p.file.unLockFileHandle()
4548
return p.localFile.Close()
4649
}
4750

48-
func (p *LocalFileProxy) Sync() error {
51+
func (p *LocalRWFileProxy) Sync() error {
4952
p.file.lockFileHandle()
5053
defer p.file.unLockFileHandle()
5154
return p.localFile.Sync()

Log.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ const (
6666
var ReportCaller = true
6767

6868
func init() {
69-
initLogger("fatal", false, "")
69+
initLogger("trace", false, "")
7070
}
7171

7272
func initLogger(l string, reportCaller bool, lfile string) {

0 commit comments

Comments
 (0)