Skip to content

Commit c47cc62

Browse files
committed
Fixed data races identified by go race detector
1 parent a5f0b96 commit c47cc62

File tree

5 files changed

+109
-73
lines changed

5 files changed

+109
-73
lines changed

Dir.go

+26-16
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func (dir *DirINode) AbsolutePathForChild(name string) string {
5353

5454
// Responds on FUSE request to get directory attributes
5555
func (dir *DirINode) Attr(ctx context.Context, a *fuse.Attr) error {
56-
dir.mutex.Lock()
57-
defer dir.mutex.Unlock()
56+
dir.lockMutex()
57+
defer dir.unlockMutex()
5858
if dir.Parent != nil && dir.FileSystem.Clock.Now().After(dir.Attrs.Expires) {
5959
err := dir.Parent.LookupAttrs(dir.Attrs.Name, &dir.Attrs)
6060
if err != nil {
@@ -103,8 +103,8 @@ func (dir *DirINode) EntriesRemove(name string) {
103103

104104
// Responds on FUSE request to lookup the directory
105105
func (dir *DirINode) Lookup(ctx context.Context, name string) (fs.Node, error) {
106-
dir.mutex.Lock()
107-
defer dir.mutex.Unlock()
106+
dir.lockMutex()
107+
defer dir.unlockMutex()
108108

109109
if !dir.FileSystem.IsPathAllowed(dir.AbsolutePathForChild(name)) {
110110
return nil, fuse.ENOENT
@@ -142,8 +142,8 @@ func (dir *DirINode) Lookup(ctx context.Context, name string) (fs.Node, error) {
142142

143143
// Responds on FUSE request to read directory
144144
func (dir *DirINode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
145-
dir.mutex.Lock()
146-
defer dir.mutex.Unlock()
145+
dir.lockMutex()
146+
defer dir.unlockMutex()
147147

148148
absolutePath := dir.AbsolutePath()
149149
loginfo("Read directory", Fields{Operation: ReadDir, Path: absolutePath})
@@ -218,8 +218,8 @@ func (dir *DirINode) LookupAttrs(name string, attrs *Attrs) error {
218218

219219
// Responds on FUSE Mkdir request
220220
func (dir *DirINode) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
221-
dir.mutex.Lock()
222-
defer dir.mutex.Unlock()
221+
dir.lockMutex()
222+
defer dir.unlockMutex()
223223

224224
err := dir.FileSystem.getDFSConnector().Mkdir(dir.AbsolutePathForChild(req.Name), req.Mode)
225225
if err != nil {
@@ -230,8 +230,8 @@ func (dir *DirINode) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node
230230

231231
// Responds on FUSE Create request
232232
func (dir *DirINode) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
233-
dir.mutex.Lock()
234-
defer dir.mutex.Unlock()
233+
dir.lockMutex()
234+
defer dir.unlockMutex()
235235

236236
loginfo("Creating a new file", Fields{Operation: Create, Path: dir.AbsolutePathForChild(req.Name), Mode: req.Mode, Flags: req.Flags})
237237
file := dir.NodeFromAttrs(Attrs{Name: req.Name, Mode: req.Mode}).(*FileINode)
@@ -247,8 +247,8 @@ func (dir *DirINode) Create(ctx context.Context, req *fuse.CreateRequest, resp *
247247

248248
// Responds on FUSE Remove request
249249
func (dir *DirINode) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
250-
dir.mutex.Lock()
251-
defer dir.mutex.Unlock()
250+
dir.lockMutex()
251+
defer dir.unlockMutex()
252252

253253
path := dir.AbsolutePathForChild(req.Name)
254254
loginfo("Removing path", Fields{Operation: Remove, Path: path})
@@ -263,8 +263,8 @@ func (dir *DirINode) Remove(ctx context.Context, req *fuse.RemoveRequest) error
263263

264264
// Responds on FUSE Rename request
265265
func (dir *DirINode) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
266-
dir.mutex.Lock()
267-
defer dir.mutex.Unlock()
266+
dir.lockMutex()
267+
defer dir.unlockMutex()
268268

269269
oldPath := dir.AbsolutePathForChild(req.OldName)
270270
newPath := newDir.(*DirINode).AbsolutePathForChild(req.NewName)
@@ -287,8 +287,8 @@ func (dir *DirINode) Rename(ctx context.Context, req *fuse.RenameRequest, newDir
287287

288288
// Responds on FUSE Chmod request
289289
func (dir *DirINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
290-
dir.mutex.Lock()
291-
defer dir.mutex.Unlock()
290+
dir.lockMutex()
291+
defer dir.unlockMutex()
292292

293293
// Get the filepath, so chmod in hdfs can work
294294
path := dir.AbsolutePath()
@@ -340,3 +340,13 @@ func (dir *DirINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp
340340

341341
return err
342342
}
343+
344+
var dirLockTime time.Time = time.Time{}
345+
346+
func (dir *DirINode) lockMutex() {
347+
dir.mutex.Lock()
348+
}
349+
350+
func (dir *DirINode) unlockMutex() {
351+
dir.mutex.Unlock()
352+
}

File.go

+34-16
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ func (file *FileINode) AbsolutePath() string {
4747

4848
// Responds to the FUSE file attribute request
4949
func (file *FileINode) Attr(ctx context.Context, a *fuse.Attr) error {
50-
file.fileMutex.Lock()
51-
defer file.fileMutex.Unlock()
50+
file.lockFile()
51+
defer file.unlockFile()
52+
5253
if file.FileSystem.Clock.Now().After(file.Attrs.Expires) {
5354
err := file.Parent.LookupAttrs(file.Attrs.Name, &file.Attrs)
5455
if err != nil {
@@ -60,8 +61,8 @@ func (file *FileINode) Attr(ctx context.Context, a *fuse.Attr) error {
6061

6162
// Responds to the FUSE file open request (creates new file handle)
6263
func (file *FileINode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
63-
file.fileMutex.Lock()
64-
defer file.fileMutex.Unlock()
64+
file.lockFile()
65+
defer file.unlockFile()
6566

6667
logdebug("Opening file", Fields{Operation: Open, Path: file.AbsolutePath(), Flags: req.Flags})
6768
handle, err := file.NewFileHandle(true, req.Flags)
@@ -75,8 +76,9 @@ func (file *FileINode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fu
7576

7677
// Opens file for reading
7778
func (file *FileINode) OpenRead() (ReadSeekCloser, error) {
78-
file.fileMutex.Lock()
79-
defer file.fileMutex.Unlock()
79+
file.lockFile()
80+
defer file.unlockFile()
81+
8082
handle, err := file.Open(nil, &fuse.OpenRequest{Flags: fuse.OpenReadOnly}, nil)
8183
if err != nil {
8284
return nil, err
@@ -86,11 +88,16 @@ func (file *FileINode) OpenRead() (ReadSeekCloser, error) {
8688

8789
// Registers an opened file handle
8890
func (file *FileINode) AddHandle(handle *FileHandle) {
91+
file.lockFileHandles()
92+
defer file.unlockFileHandles()
8993
file.activeHandles = append(file.activeHandles, handle)
9094
}
9195

9296
// Unregisters an opened file handle
9397
func (file *FileINode) RemoveHandle(handle *FileHandle) {
98+
file.lockFileHandles()
99+
defer file.unlockFileHandles()
100+
94101
for i, h := range file.activeHandles {
95102
if h == handle {
96103
file.activeHandles = append(file.activeHandles[:i], file.activeHandles[i+1:]...)
@@ -102,8 +109,9 @@ func (file *FileINode) RemoveHandle(handle *FileHandle) {
102109
// Responds to the FUSE Fsync request
103110
func (file *FileINode) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
104111
loginfo(fmt.Sprintf("Dispatching fsync request to all open handles: %d", len(file.activeHandles)), Fields{Operation: Fsync})
105-
file.fileMutex.Lock()
106-
defer file.fileMutex.Unlock()
112+
file.lockFile()
113+
defer file.unlockFile()
114+
107115
var retErr error
108116
for _, handle := range file.activeHandles {
109117
err := handle.Fsync(ctx, req)
@@ -121,8 +129,8 @@ func (file *FileINode) InvalidateMetadataCache() {
121129

122130
// Responds on FUSE Chmod request
123131
func (file *FileINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
124-
file.fileMutex.Lock()
125-
defer file.fileMutex.Unlock()
132+
file.lockFile()
133+
defer file.unlockFile()
126134

127135
if req.Valid.Size() {
128136
var retErr error
@@ -189,6 +197,8 @@ func (file *FileINode) Setattr(ctx context.Context, req *fuse.SetattrRequest, re
189197
}
190198

191199
func (file *FileINode) countActiveHandles() int {
200+
file.lockFileHandles()
201+
file.unlockFileHandles()
192202
return len(file.activeHandles)
193203
}
194204

@@ -256,8 +266,8 @@ func (file *FileINode) downloadToStaging(stagingFile *os.File, operation string)
256266

257267
// Creates new file handle
258268
func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*FileHandle, error) {
259-
file.lockFileHandle()
260-
defer file.unLockFileHandle()
269+
file.lockFileHandles()
270+
defer file.unlockFileHandles()
261271

262272
fh := &FileHandle{File: file, fileFlags: flags, fhID: int64(rand.Uint64())}
263273
operation := Create
@@ -298,8 +308,8 @@ func (file *FileINode) NewFileHandle(existsInDFS bool, flags fuse.OpenFlags) (*F
298308

299309
// changes RO file handle to RW
300310
func (file *FileINode) upgradeHandleForWriting() error {
301-
file.lockFileHandle()
302-
defer file.unLockFileHandle()
311+
file.lockFileHandles()
312+
defer file.unlockFileHandles()
303313

304314
var upgrade = false
305315
if _, ok := file.handle.(*LocalRWFileProxy); ok {
@@ -356,10 +366,18 @@ func (file *FileINode) logInfo(fields Fields) Fields {
356366
return f
357367
}
358368

359-
func (file *FileINode) lockFileHandle() {
369+
func (file *FileINode) lockFileHandles() {
360370
file.fileHandleMutex.Lock()
361371
}
362372

363-
func (file *FileINode) unLockFileHandle() {
373+
func (file *FileINode) unlockFileHandles() {
364374
file.fileHandleMutex.Unlock()
365375
}
376+
377+
func (file *FileINode) lockFile() {
378+
file.fileMutex.Lock()
379+
}
380+
381+
func (file *FileINode) unlockFile() {
382+
file.fileMutex.Unlock()
383+
}

HopsFileHandle.go

+21-13
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// Represends a handle to an open file
1515
type FileHandle struct {
1616
File *FileINode
17-
Mutex sync.Mutex // all operations on the handle are serialized to simplify invariants
17+
mutex sync.Mutex // all operations on the handle are serialized to simplify invariants
1818
fileFlags fuse.OpenFlags // flags used to creat the file
1919
tatalBytesRead int64
2020
totalBytesWritten int64
@@ -48,15 +48,15 @@ func (fh *FileHandle) Truncate(size int64) error {
4848

4949
// Returns attributes of the file associated with this handle
5050
func (fh *FileHandle) Attr(ctx context.Context, a *fuse.Attr) error {
51-
fh.Mutex.Lock()
52-
defer fh.Mutex.Unlock()
51+
fh.lockHandle()
52+
defer fh.unlockHandle()
5353
return fh.File.Attr(ctx, a)
5454
}
5555

5656
// Responds to FUSE Read request
5757
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
58-
fh.Mutex.Lock()
59-
defer fh.Mutex.Unlock()
58+
fh.lockHandle()
59+
defer fh.unlockHandle()
6060

6161
buf := resp.Data[0:req.Size]
6262
nr, err := fh.File.handle.ReadAt(buf, req.Offset)
@@ -78,8 +78,8 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
7878

7979
// Responds to FUSE Write request
8080
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
81-
fh.Mutex.Lock()
82-
defer fh.Mutex.Unlock()
81+
fh.lockHandle()
82+
defer fh.unlockHandle()
8383

8484
// as an optimization the file is initially opened in readonly mode
8585
fh.File.upgradeHandleForWriting()
@@ -164,8 +164,8 @@ func (fh *FileHandle) FlushAttempt(operation string) error {
164164

165165
// Responds to the FUSE Flush request
166166
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
167-
fh.Mutex.Lock()
168-
defer fh.Mutex.Unlock()
167+
fh.lockHandle()
168+
defer fh.unlockHandle()
169169
if fh.dataChanged() {
170170
loginfo("Flush file", fh.logInfo(Fields{Operation: Flush}))
171171
return fh.copyToDFS(Flush)
@@ -176,8 +176,8 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
176176

177177
// Responds to the FUSE Fsync request
178178
func (fh *FileHandle) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
179-
fh.Mutex.Lock()
180-
defer fh.Mutex.Unlock()
179+
fh.lockHandle()
180+
defer fh.unlockHandle()
181181
if fh.dataChanged() {
182182
loginfo("Fsync file", fh.logInfo(Fields{Operation: Fsync}))
183183
return fh.copyToDFS(Fsync)
@@ -188,8 +188,8 @@ func (fh *FileHandle) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
188188

189189
// Closes the handle
190190
func (fh *FileHandle) Release(_ context.Context, _ *fuse.ReleaseRequest) error {
191-
fh.Mutex.Lock()
192-
defer fh.Mutex.Unlock()
191+
fh.lockHandle()
192+
defer fh.unlockHandle()
193193

194194
//close the file handle if it is the last handle
195195
fh.File.InvalidateMetadataCache()
@@ -217,3 +217,11 @@ func (fh *FileHandle) logInfo(fields Fields) Fields {
217217
}
218218
return f
219219
}
220+
221+
func (fh *FileHandle) lockHandle() {
222+
fh.mutex.Lock()
223+
}
224+
225+
func (fh *FileHandle) unlockHandle() {
226+
fh.mutex.Unlock()
227+
}

LocalFileProxy.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -10,46 +10,46 @@ type LocalRWFileProxy struct {
1010
var _ FileProxy = (*LocalRWFileProxy)(nil)
1111

1212
func (p *LocalRWFileProxy) Truncate(size int64) error {
13-
p.file.lockFileHandle()
14-
defer p.file.unLockFileHandle()
13+
p.file.lockFileHandles()
14+
defer p.file.unlockFileHandles()
1515
return p.localFile.Truncate(size)
1616
}
1717

1818
func (p *LocalRWFileProxy) WriteAt(b []byte, off int64) (n int, err error) {
19-
p.file.lockFileHandle()
20-
defer p.file.unLockFileHandle()
19+
p.file.lockFileHandles()
20+
defer p.file.unlockFileHandles()
2121
return p.localFile.WriteAt(b, off)
2222
}
2323

2424
func (p *LocalRWFileProxy) ReadAt(b []byte, off int64) (n int, err error) {
25-
p.file.lockFileHandle()
26-
defer p.file.unLockFileHandle()
25+
p.file.lockFileHandles()
26+
defer p.file.unlockFileHandles()
2727
n, err = p.localFile.ReadAt(b, off)
2828
logdebug("LocalFileProxy ReadAt", p.file.logInfo(Fields{Operation: Read, Bytes: n, Error: err, Offset: off}))
2929
return
3030
}
3131

3232
func (p *LocalRWFileProxy) SeekToStart() (err error) {
33-
p.file.lockFileHandle()
34-
defer p.file.unLockFileHandle()
33+
p.file.lockFileHandles()
34+
defer p.file.unlockFileHandles()
3535
_, err = p.localFile.Seek(0, 0)
3636
return
3737
}
3838

3939
func (p *LocalRWFileProxy) Read(b []byte) (n int, err error) {
40-
p.file.lockFileHandle()
41-
defer p.file.unLockFileHandle()
40+
p.file.lockFileHandles()
41+
defer p.file.unlockFileHandles()
4242
return p.localFile.Read(b)
4343
}
4444

4545
func (p *LocalRWFileProxy) Close() error {
46-
p.file.lockFileHandle()
47-
defer p.file.unLockFileHandle()
46+
p.file.lockFileHandles()
47+
defer p.file.unlockFileHandles()
4848
return p.localFile.Close()
4949
}
5050

5151
func (p *LocalRWFileProxy) Sync() error {
52-
p.file.lockFileHandle()
53-
defer p.file.unLockFileHandle()
52+
p.file.lockFileHandles()
53+
defer p.file.unlockFileHandles()
5454
return p.localFile.Sync()
5555
}

0 commit comments

Comments
 (0)