Skip to content

Commit 3415f48

Browse files
committed
Separating file handles for RO and RW operations
1 parent 8c5d91f commit 3415f48

6 files changed

+198
-109
lines changed

Dir.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (dir *DirINode) Create(ctx context.Context, req *fuse.CreateRequest, resp *
235235

236236
loginfo("Creating a new file", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name), Mode: req.Mode, Flags: req.Flags})
237237
file := dir.NodeFromAttrs(Attrs{Name: req.Name, Mode: req.Mode}).(*FileINode)
238-
handle, err := NewFileHandle(file, false, req.Flags)
238+
handle, err := file.NewFileHandle(false, req.Flags)
239239
if err != nil {
240240
logerror("File creation failed", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name), Mode: req.Mode, Flags: req.Flags, Error: err})
241241
//TODO remove the entry from the cache

File.go

+114-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@ package main
44

55
import (
66
"fmt"
7+
"io"
8+
"io/ioutil"
9+
"math/rand"
710
"os"
811
"os/user"
912
"path"
1013
"sync"
14+
"syscall"
1115
"time"
1216

1317
"bazil.org/fuse"
1418
"bazil.org/fuse/fs"
1519
"golang.org/x/net/context"
20+
"golang.org/x/sys/unix"
1621
)
1722

1823
type FileINode struct {
@@ -22,7 +27,7 @@ type FileINode struct {
2227

2328
activeHandles []*FileHandle // list of opened file handles
2429
fileMutex sync.Mutex // mutex for activeHandles
25-
handle *os.File // handle to the temp file in staging dir
30+
handle FileProxy // handle to the temp file in staging dir
2631
}
2732

2833
// Verify that *File implements necesary FUSE interfaces
@@ -56,7 +61,7 @@ func (file *FileINode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fu
5661
defer file.fileMutex.Unlock()
5762

5863
logdebug("Opening file", Fields{Operation: Open, Path: file.AbsolutePath(), Flags: req.Flags})
59-
handle, err := NewFileHandle(file, true, req.Flags)
64+
handle, err := file.NewFileHandle(true, req.Flags)
6065
if err != nil {
6166
return nil, err
6267
}
@@ -181,3 +186,110 @@ func (file *FileINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, re
181186
func (file *FileINode) countActiveHandles() int {
182187
return len(file.activeHandles)
183188
}
189+
190+
func (file *FileINode) createStagingFile(operation string, existsInDFS bool) error {
191+
if file.handle != nil {
192+
return nil // there is already an active handle.
193+
}
194+
195+
//create staging file
196+
absPath := file.AbsolutePath()
197+
hdfsAccessor := file.FileSystem.HdfsAccessor
198+
if !existsInDFS { // it is a new file so create it in the DFS
199+
w, err := hdfsAccessor.CreateFile(absPath, file.Attrs.Mode, false)
200+
if err != nil {
201+
logerror("Failed to create file in DFS", file.logInfo(Fields{Operation: operation, Error: err}))
202+
return err
203+
}
204+
loginfo("Created an empty file in DFS", file.logInfo(Fields{Operation: operation}))
205+
w.Close()
206+
} else {
207+
// Request to write to existing file
208+
_, err := hdfsAccessor.Stat(absPath)
209+
if err != nil {
210+
logerror("Failed to stat file in DFS", file.logInfo(Fields{Operation: operation, Error: err}))
211+
return syscall.ENOENT
212+
}
213+
}
214+
215+
stagingFile, err := ioutil.TempFile(stagingDir, "stage")
216+
if err != nil {
217+
logerror("Failed to create staging file", file.logInfo(Fields{Operation: operation, Error: err}))
218+
return err
219+
}
220+
os.Remove(stagingFile.Name())
221+
loginfo("Created staging file", file.logInfo(Fields{Operation: operation, TmpFile: stagingFile.Name()}))
222+
223+
if existsInDFS {
224+
if err := file.downloadToStaging(stagingFile, operation); err != nil {
225+
return err
226+
}
227+
}
228+
file.handle = &LocalFileProxy{localFile: stagingFile}
229+
return nil
230+
}
231+
232+
func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string) error {
233+
hdfsAccessor := file.FileSystem.HdfsAccessor
234+
absPath := file.AbsolutePath()
235+
236+
reader, err := hdfsAccessor.OpenRead(absPath)
237+
if err != nil {
238+
logerror("Failed to open file in DFS", file.logInfo(Fields{Operation: operation, Error: err}))
239+
// TODO remove the staging file if there are no more active handles
240+
return err
241+
}
242+
243+
nc, err := io.Copy(stagingFile, reader)
244+
if err != nil {
245+
logerror("Failed to copy content to staging file", file.logInfo(Fields{Operation: operation, Error: err}))
246+
return err
247+
}
248+
reader.Close()
249+
loginfo(fmt.Sprintf("Downloaded a copy to stating dir. %d bytes copied", nc), file.logInfo(Fields{Operation: operation}))
250+
return nil
251+
}
252+
253+
// Creates new file handle
254+
func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*FileHandle, error) {
255+
operation := Create
256+
if existsInDFS {
257+
operation = Open
258+
}
259+
260+
fh := &FileHandle{File: file, fileFlags: flags, fhID: int64(rand.Uint64())}
261+
if err := file.checkDiskSpace(); err != nil {
262+
return nil, err
263+
}
264+
265+
if err := file.createStagingFile(operation, existsInDFS); err != nil {
266+
return nil, err
267+
}
268+
269+
loginfo("Opened file", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
270+
return fh, nil
271+
}
272+
273+
func (file *FileINode) checkDiskSpace() error {
274+
var stat unix.Statfs_t
275+
wd, err := os.Getwd()
276+
if err != nil {
277+
return err
278+
}
279+
unix.Statfs(wd, &stat)
280+
// Available blocks * size per block = available space in bytes
281+
bytesAvailable := stat.Bavail * uint64(stat.Bsize)
282+
if bytesAvailable < 64*1024*1024 {
283+
return syscall.ENOSPC
284+
} else {
285+
return nil
286+
}
287+
}
288+
289+
func (file *FileINode) logInfo(fields Fields) Fields {
290+
f := Fields{Path: file.AbsolutePath()}
291+
for k, e := range fields {
292+
f[k] = e
293+
}
294+
return f
295+
}

FileProxy.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package main
2+
3+
type FileProxy interface {
4+
Truncate(size int64) error
5+
WriteAt(b []byte, off int64) (n int, err error)
6+
ReadAt(b []byte, off int64) (n int, err error)
7+
Seek(offset int64, whence int) (ret int64, err error)
8+
Read(b []byte) (n int, err error)
9+
Close() error
10+
Sync() error
11+
}

HopsFileHandle.go

-106
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,12 @@
33
package main
44

55
import (
6-
"fmt"
76
"io"
8-
"io/ioutil"
9-
"math/rand"
10-
"os"
117
"sync"
12-
"syscall"
138

149
"bazil.org/fuse"
1510
"bazil.org/fuse/fs"
1611
"golang.org/x/net/context"
17-
"golang.org/x/sys/unix"
1812
)
1913

2014
// Represends a handle to an open file
@@ -35,90 +29,6 @@ var _ fs.HandleWriter = (*FileHandle)(nil)
3529
var _ fs.NodeFsyncer = (*FileHandle)(nil)
3630
var _ fs.HandleFlusher = (*FileHandle)(nil)
3731

38-
func (fh *FileHandle) createStagingFile(operation string, existsInDFS bool) error {
39-
if fh.File.handle != nil {
40-
return nil // there is already an active handle.
41-
}
42-
43-
//create staging file
44-
absPath := fh.File.AbsolutePath()
45-
hdfsAccessor := fh.File.FileSystem.HdfsAccessor
46-
if !existsInDFS { // it is a new file so create it in the DFS
47-
w, err := hdfsAccessor.CreateFile(absPath, fh.File.Attrs.Mode, false)
48-
if err != nil {
49-
logerror("Failed to create file in DFS", fh.logInfo(Fields{Operation: operation, Error: err}))
50-
return err
51-
}
52-
loginfo("Created an empty file in DFS", fh.logInfo(Fields{Operation: operation}))
53-
w.Close()
54-
} else {
55-
// Request to write to existing file
56-
_, err := hdfsAccessor.Stat(absPath)
57-
if err != nil {
58-
logerror("Failed to stat file in DFS", fh.logInfo(Fields{Operation: operation, Error: err}))
59-
return syscall.ENOENT
60-
}
61-
}
62-
63-
stagingFile, err := ioutil.TempFile(stagingDir, "stage")
64-
if err != nil {
65-
logerror("Failed to create staging file", fh.logInfo(Fields{Operation: operation, Error: err}))
66-
return err
67-
}
68-
os.Remove(stagingFile.Name())
69-
loginfo("Created staging file", fh.logInfo(Fields{Operation: operation, TmpFile: stagingFile.Name()}))
70-
71-
if existsInDFS {
72-
if err := fh.downloadToStaging(stagingFile, operation); err != nil {
73-
return err
74-
}
75-
}
76-
fh.File.handle = stagingFile
77-
return nil
78-
}
79-
80-
func (fh *FileHandle) downloadToStaging(stagingFile *os.File, operation string) error {
81-
hdfsAccessor := fh.File.FileSystem.HdfsAccessor
82-
absPath := fh.File.AbsolutePath()
83-
84-
reader, err := hdfsAccessor.OpenRead(absPath)
85-
if err != nil {
86-
logerror("Failed to open file in DFS", fh.logInfo(Fields{Operation: operation, Error: err}))
87-
// TODO remove the staging file if there are no more active handles
88-
return err
89-
}
90-
91-
nc, err := io.Copy(stagingFile, reader)
92-
if err != nil {
93-
logerror("Failed to copy content to staging file", fh.logInfo(Fields{Operation: operation, Error: err}))
94-
return err
95-
}
96-
reader.Close()
97-
loginfo(fmt.Sprintf("Downloaded a copy to stating dir. %d bytes copied", nc), fh.logInfo(Fields{Operation: operation}))
98-
return nil
99-
}
100-
101-
// Creates new file handle
102-
func NewFileHandle(file *FileINode, existsInDFS bool, flags fuse.OpenFlags) (*FileHandle, error) {
103-
104-
operation := Create
105-
if existsInDFS {
106-
operation = Open
107-
}
108-
109-
fh := &FileHandle{File: file, fileFlags: flags, fhID: int64(rand.Uint64())}
110-
if err := checkDiskSpace(); err != nil {
111-
return nil, err
112-
}
113-
114-
if err := fh.createStagingFile(operation, existsInDFS); err != nil {
115-
return nil, err
116-
}
117-
118-
loginfo("Opened file", fh.logInfo(Fields{Operation: operation, Flags: fh.fileFlags}))
119-
return fh, nil
120-
}
121-
12232
func (fh *FileHandle) isWriteable() bool {
12333
if fh.fileFlags.IsWriteOnly() || fh.fileFlags.IsReadWrite() {
12434
return true
@@ -136,22 +46,6 @@ func (fh *FileHandle) Truncate(size int64) error {
13646
return nil
13747
}
13848

139-
func checkDiskSpace() error {
140-
var stat unix.Statfs_t
141-
wd, err := os.Getwd()
142-
if err != nil {
143-
return err
144-
}
145-
unix.Statfs(wd, &stat)
146-
// Available blocks * size per block = available space in bytes
147-
bytesAvailable := stat.Bavail * uint64(stat.Bsize)
148-
if bytesAvailable < 64*1024*1024 {
149-
return syscall.ENOSPC
150-
} else {
151-
return nil
152-
}
153-
}
154-
15549
// Returns attributes of the file associated with this handle
15650
func (fh *FileHandle) Attr(ctx context.Context, a *fuse.Attr) error {
15751
fh.Mutex.Lock()

LocalFileProxy.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package main
2+
3+
import "os"
4+
5+
type LocalFileProxy struct {
6+
localFile *os.File // handle to the temp file in staging dir
7+
}
8+
9+
var _ FileProxy = (*LocalFileProxy)(nil)
10+
11+
func (p *LocalFileProxy) Truncate(size int64) error {
12+
return p.localFile.Truncate(size)
13+
}
14+
15+
func (p *LocalFileProxy) WriteAt(b []byte, off int64) (n int, err error) {
16+
return p.localFile.WriteAt(b, off)
17+
}
18+
19+
func (p *LocalFileProxy) ReadAt(b []byte, off int64) (n int, err error) {
20+
return p.localFile.ReadAt(b, off)
21+
}
22+
23+
func (p *LocalFileProxy) Seek(offset int64, whence int) (ret int64, err error) {
24+
return p.localFile.Seek(offset, whence)
25+
}
26+
27+
func (p *LocalFileProxy) Read(b []byte) (n int, err error) {
28+
return p.localFile.Read(b)
29+
}
30+
31+
func (p *LocalFileProxy) Close() error {
32+
return p.localFile.Close()
33+
}
34+
35+
func (p *LocalFileProxy) Sync() error {
36+
return p.localFile.Sync()
37+
}

RemoteFileProxy.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package main
2+
3+
type RemoteFileProxy struct {
4+
}
5+
6+
var _ FileProxy = (*RemoteFileProxy)(nil)
7+
8+
func (p *RemoteFileProxy) Truncate(size int64) error {
9+
logfatal("Not implemented yet", nil)
10+
return nil
11+
}
12+
func (p *RemoteFileProxy) WriteAt(b []byte, off int64) (n int, err error) {
13+
logfatal("Not implemented yet", nil)
14+
return 0, nil
15+
}
16+
func (p *RemoteFileProxy) ReadAt(b []byte, off int64) (n int, err error) {
17+
logfatal("Not implemented yet", nil)
18+
return 0, nil
19+
}
20+
func (p *RemoteFileProxy) Seek(offset int64, whence int) (ret int64, err error) {
21+
logfatal("Not implemented yet", nil)
22+
return 0, nil
23+
}
24+
func (p *RemoteFileProxy) Read(b []byte) (n int, err error) {
25+
logfatal("Not implemented yet", nil)
26+
return 0, nil
27+
}
28+
func (p *RemoteFileProxy) Close() error {
29+
logfatal("Not implemented yet", nil)
30+
return nil
31+
}
32+
func (p *RemoteFileProxy) Sync() error {
33+
logfatal("Not implemented yet", nil)
34+
return nil
35+
}

0 commit comments

Comments
 (0)