Skip to content

Commit 0b75879

Browse files
authored
Merge pull request #21 from smkniazi/fixing-rpc
Fixing GH-10, GH-16, GH-19
2 parents 0402104 + 32149f1 commit 0b75879

10 files changed

+101
-49
lines changed

FaultTolerantHdfsAccessor.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (fta *FaultTolerantHdfsAccessor) EnsureConnected() error {
2626
op := fta.RetryPolicy.StartOperation()
2727
for {
2828
err := fta.Impl.EnsureConnected()
29-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Connect: %s", err) {
29+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("Connect: %s", err) {
3030
return err
3131
}
3232
}
@@ -41,7 +41,7 @@ func (fta *FaultTolerantHdfsAccessor) OpenRead(path string) (ReadSeekCloser, err
4141
// wrapping returned HdfsReader with FaultTolerantHdfsReader
4242
return NewFaultTolerantHdfsReader(path, result, fta.Impl, fta.RetryPolicy), nil
4343
}
44-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] OpenRead: %s", path, err) {
44+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] OpenRead: %s", path, err) {
4545
return nil, err
4646
} else {
4747
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -61,7 +61,7 @@ func (fta *FaultTolerantHdfsAccessor) ReadDir(path string) ([]Attrs, error) {
6161
op := fta.RetryPolicy.StartOperation()
6262
for {
6363
result, err := fta.Impl.ReadDir(path)
64-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] ReadDir: %s", path, err) {
64+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] ReadDir: %s", path, err) {
6565
return result, err
6666
} else {
6767
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -75,7 +75,7 @@ func (fta *FaultTolerantHdfsAccessor) Stat(path string) (Attrs, error) {
7575
op := fta.RetryPolicy.StartOperation()
7676
for {
7777
result, err := fta.Impl.Stat(path)
78-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Stat: %s", path, err) {
78+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] Stat: %s", path, err) {
7979
return result, err
8080
} else {
8181
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -89,7 +89,7 @@ func (fta *FaultTolerantHdfsAccessor) StatFs() (FsInfo, error) {
8989
op := fta.RetryPolicy.StartOperation()
9090
for {
9191
result, err := fta.Impl.StatFs()
92-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("StatFs: %s", err) {
92+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("StatFs: %s", err) {
9393
return result, err
9494
} else {
9595
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -103,7 +103,7 @@ func (fta *FaultTolerantHdfsAccessor) Mkdir(path string, mode os.FileMode) error
103103
op := fta.RetryPolicy.StartOperation()
104104
for {
105105
err := fta.Impl.Mkdir(path, mode)
106-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Mkdir %s: %s", path, mode, err) {
106+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] Mkdir %s: %s", path, mode, err) {
107107
return err
108108
} else {
109109
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -117,7 +117,7 @@ func (fta *FaultTolerantHdfsAccessor) Remove(path string) error {
117117
op := fta.RetryPolicy.StartOperation()
118118
for {
119119
err := fta.Impl.Remove(path)
120-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Remove: %s", path, err) {
120+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] Remove: %s", path, err) {
121121
return err
122122
} else {
123123
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -131,7 +131,7 @@ func (fta *FaultTolerantHdfsAccessor) Rename(oldPath string, newPath string) err
131131
op := fta.RetryPolicy.StartOperation()
132132
for {
133133
err := fta.Impl.Rename(oldPath, newPath)
134-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Rename to %s: %s", oldPath, newPath, err) {
134+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] Rename to %s: %s", oldPath, newPath, err) {
135135
return err
136136
} else {
137137
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -145,7 +145,7 @@ func (fta *FaultTolerantHdfsAccessor) Chmod(path string, mode os.FileMode) error
145145
op := fta.RetryPolicy.StartOperation()
146146
for {
147147
err := fta.Impl.Chmod(path, mode)
148-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Chmod [%s] to [%d]: %s", path, mode, err) {
148+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("Chmod [%s] to [%d]: %s", path, mode, err) {
149149
return err
150150
} else {
151151
// Clean up the bad connection, to let underline connection to get automatic refresh
@@ -159,7 +159,7 @@ func (fta *FaultTolerantHdfsAccessor) Chown(path string, user, group string) err
159159
op := fta.RetryPolicy.StartOperation()
160160
for {
161161
err := fta.Impl.Chown(path, user, group)
162-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Chown [%s] to [%s:%s]: %s", path, user, group, err) {
162+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("Chown [%s] to [%s:%s]: %s", path, user, group, err) {
163163
return err
164164
} else {
165165
// Clean up the bad connection, to let underline connection to get automatic refresh

FaultTolerantHdfsReader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (ftr *FaultTolerantHdfsReader) Read(buffer []byte) (int, error) {
4242
// Performing the read
4343
var nr int
4444
nr, err = ftr.Impl.Read(buffer)
45-
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Read @%d: %s", ftr.Path, ftr.Offset, err.Error()) {
45+
if IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("[%s] Read @%d: %s", ftr.Path, ftr.Offset, err.Error()) {
4646
if err == nil {
4747
// On successful read, adjusting offset to the actual number of bytes read
4848
ftr.Offset += int64(nr)

FileSystemOperations_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestMountSubDir(t *testing.T) {
7878
}
7979

8080
func withMount(t testing.TB, srcDir string, fn func(mntPath string, hdfsAccessor HdfsAccessor)) {
81-
initLogger("error", os.Stdout, false)
81+
initLogger("error", false, "")
8282

8383
hdfsAccessor, err := NewHdfsAccessor("localhost:8020", WallClock{}, TLSConfig{TLS: false})
8484
if err != nil {

HdfsAccessor.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (dfs *hdfsAccessorImpl) ReadDir(path string) ([]Attrs, error) {
199199
}
200200
files, err := dfs.MetadataClient.ReadDir(path)
201201
if err != nil {
202-
if IsSuccessOrBenignError(err) {
202+
if IsSuccessOrNonRetriableError(err) {
203203
// benign error (e.g. path not found)
204204
return nil, err
205205
}
@@ -228,7 +228,7 @@ func (dfs *hdfsAccessorImpl) Stat(path string) (Attrs, error) {
228228

229229
fileInfo, err := dfs.MetadataClient.Stat(path)
230230
if err != nil {
231-
if IsSuccessOrBenignError(err) {
231+
if IsSuccessOrNonRetriableError(err) {
232232
// benign error (e.g. path not found)
233233
return Attrs{}, err
234234
}
@@ -253,7 +253,7 @@ func (dfs *hdfsAccessorImpl) StatFs() (FsInfo, error) {
253253

254254
fsInfo, err := dfs.MetadataClient.StatFs()
255255
if err != nil {
256-
if IsSuccessOrBenignError(err) {
256+
if IsSuccessOrNonRetriableError(err) {
257257
return FsInfo{}, err
258258
}
259259
dfs.MetadataClient = nil
@@ -359,7 +359,7 @@ func (dfs *hdfsAccessorImpl) LookupGid(groupName string) uint32 {
359359
}
360360

361361
// Returns true if err==nil or err is expected (benign) error which should be propagated directoy to the caller
362-
func IsSuccessOrBenignError(err error) bool {
362+
func IsSuccessOrNonRetriableError(err error) bool {
363363
if err == nil || err == io.EOF || err == fuse.EEXIST {
364364
return true
365365
}

HopsFileHandle.go

+21-19
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import (
88
"io/ioutil"
99
"os"
1010
"sync"
11-
"time"
11+
"syscall"
1212

1313
"bazil.org/fuse"
1414
"bazil.org/fuse/fs"
1515
"golang.org/x/net/context"
16+
"golang.org/x/sys/unix"
1617
)
1718

1819
// Represends a handle to an open file
@@ -104,7 +105,9 @@ func NewFileHandle(file *File, existsInDFS bool, flags fuse.OpenFlags) (*FileHan
104105
}
105106

106107
fh := &FileHandle{File: file, fileFlags: flags}
107-
checkDiskSpace(fh.File.FileSystem.HdfsAccessor)
108+
if err := checkDiskSpace(); err != nil {
109+
return nil, err
110+
}
108111

109112
if err := fh.createStagingFile(operation, existsInDFS); err != nil {
110113
return nil, err
@@ -131,17 +134,20 @@ func (fh *FileHandle) Truncate(size int64) error {
131134
return nil
132135
}
133136

134-
func checkDiskSpace(hdfsAccessor HdfsAccessor) error {
135-
//TODO FIXME
136-
// fsInfo, err := hdfsAccessor.StatFs()
137-
// if err != nil {
138-
// // Donot abort, continue writing
139-
// Error.Println("Failed to get HDFS usage, ERROR:", err)
140-
// } else if uint64(req.Offset) >= fsInfo.remaining {
141-
// Error.Println("[", fhw.Handle.File.AbsolutePath(), "] writes larger size (", req.Offset, ")than HDFS available size (", fsInfo.remaining, ")")
142-
// return errors.New("Too large file")
143-
// }
144-
return nil
137+
func checkDiskSpace() error {
138+
var stat unix.Statfs_t
139+
wd, err := os.Getwd()
140+
if err != nil {
141+
return err
142+
}
143+
unix.Statfs(wd, &stat)
144+
// Available blocks * size per block = available space in bytes
145+
bytesAvailable := stat.Bavail * uint64(stat.Bsize)
146+
if bytesAvailable < 64*1024*1024 {
147+
return syscall.ENOSPC
148+
} else {
149+
return nil
150+
}
145151
}
146152

147153
// Returns attributes of the file associated with this handle
@@ -203,17 +209,13 @@ func (fh *FileHandle) copyToDFS(operation string) error {
203209
op := fh.File.FileSystem.RetryPolicy.StartOperation()
204210
for {
205211
err := fh.FlushAttempt(operation)
206-
if err != io.EOF || IsSuccessOrBenignError(err) || !op.ShouldRetry("Flush()", err) {
212+
if err != io.EOF || IsSuccessOrNonRetriableError(err) || !op.ShouldRetry("Flush()", err) {
207213
return err
208214
}
209-
// Restart a new connection, https://github.com/colinmarc/hdfs/issues/86
215+
// Reconnect and try again
210216
fh.File.FileSystem.HdfsAccessor.Close()
211217
logwarn("Failed to copy file to DFS", Fields{Operation: operation, Path: fh.File.AbsolutePath()})
212-
// Wait for 30 seconds before another retry to get another set of datanodes.
213-
// https://community.hortonworks.com/questions/2474/how-to-identify-stale-datanode.html
214-
time.Sleep(30 * time.Second)
215218
}
216-
return nil
217219
}
218220

219221
func (fh *FileHandle) FlushAttempt(operation string) error {

Log.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ package main
44

55
import (
66
"fmt"
7-
"io"
7+
"os"
88
"runtime"
99

1010
nested "github.com/antonfisher/nested-logrus-formatter"
1111
logger "github.com/sirupsen/logrus"
12+
"gopkg.in/natefinch/lumberjack.v2"
1213
)
1314

1415
// bunch of constants for logging
@@ -67,7 +68,7 @@ func init() {
6768
logger.SetLevel(logger.ErrorLevel)
6869
}
6970

70-
func initLogger(l string, out io.Writer, reportCaller bool) {
71+
func initLogger(l string, reportCaller bool, lfile string) {
7172
ReportCaller = reportCaller
7273
lvl, err := logger.ParseLevel(l)
7374
if err != nil {
@@ -78,10 +79,9 @@ func initLogger(l string, out io.Writer, reportCaller bool) {
7879
// Output to stdout instead of the default stderr
7980
// Can be any io.Writer, see below for File example
8081
// TODO log to file and log cutting
81-
logger.SetOutput(out)
8282

83-
//Json
84-
// logger.SetFormatter(&logger.JSONFormatter{})
83+
//Json output
84+
//logger.SetFormatter(&logger.JSONFormatter{})
8585

8686
//set custom formatter github.com/antonfisher/nested-logrus-formatter
8787
logger.SetFormatter(&nested.Formatter{
@@ -92,6 +92,18 @@ func initLogger(l string, out io.Writer, reportCaller bool) {
9292

9393
// Only log the warning severity or above.
9494
logger.SetLevel(lvl)
95+
96+
// setup log cutting
97+
if lfile != "" {
98+
logger.SetOutput(&lumberjack.Logger{
99+
Filename: lfile,
100+
MaxSize: 100, // megabytes
101+
MaxBackups: 10,
102+
MaxAge: 30, //days
103+
})
104+
} else {
105+
logger.SetOutput(os.Stdout)
106+
}
95107
}
96108

97109
type Fields logger.Fields

RetryPolicy.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (op *Op) ShouldRetry(message string, args ...interface{}) bool {
6767
diag = "exceeded max configured time interval for retries"
6868
}
6969
if diag != "" {
70-
logerror(fmt.Sprintf("Failed all retries. %v ", args), Fields{Operation: RetryingPolicy, Message: message, Retries: op.Attempt, Diag: diag})
70+
logerror("Failed all retries.", Fields{Operation: RetryingPolicy, Message: fmt.Sprintf(message, args...), Retries: op.Attempt, Diag: diag})
7171
return false
7272
}
7373
// Computing delay (exponential backoff)
@@ -86,7 +86,7 @@ func (op *Op) ShouldRetry(message string, args ...interface{}) bool {
8686
}
8787

8888
// Logging information about failed attempt
89-
logwarn(fmt.Sprintf("Failed try. Retrying %v", args), Fields{Operation: RetryingPolicy, Message: message, Retries: op.Attempt, Delay: effectiveDelay})
89+
logwarn("Failed try. Retrying", Fields{Operation: RetryingPolicy, Message: fmt.Sprintf(message, args...), Retries: op.Attempt, Delay: effectiveDelay})
9090
op.Attempt++
9191

9292
// Sleeping

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ require (
1010
github.com/sirupsen/logrus v1.8.1
1111
github.com/stretchr/testify v1.4.0
1212
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
13+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
14+
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
1315
)
1416

1517
replace github.com/colinmarc/hdfs/v2 v2.2.0 => github.com/logicalclocks/hopsfs-go-client/v2 v2.4.1
18+
1619
//replace github.com/colinmarc/hdfs/v2 v2.2.0 => /home/salman/code/hops/hopsfs-go/hopsfs-go-client
1720
//replace bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 => /home/salman/code/hops/hopsfs-go/fuse

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,7 @@ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+Rur
9292
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
9393
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
9494
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
95+
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
96+
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
9597
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
9698
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

0 commit comments

Comments
 (0)