3
3
package main
4
4
5
5
import (
6
- "errors"
7
6
"fmt"
8
7
"io"
9
8
"os"
10
- "os/user"
11
- "strconv"
12
9
"strings"
13
10
"sync"
14
11
"syscall"
15
12
"time"
16
13
17
14
"bazil.org/fuse"
18
15
"github.com/colinmarc/hdfs/v2"
19
- )
20
-
21
- const (
22
- UGCacheTime = 3 * time .Second
16
+ "logicalclocks.com/hopsfs-mount/ugcache"
23
17
)
24
18
25
19
// Interface for accessing HDFS
26
20
// Concurrency: thread safe: handles unlimited number of concurrent requests
21
+ var hadoopUserName string = os .Getenv ("HADOOP_USER_NAME" )
22
+ var hadoopUserID uint32 = 0
23
+
27
24
type HdfsAccessor interface {
28
25
OpenRead (path string ) (ReadSeekCloser , error ) // Opens HDFS file for reading
29
26
CreateFile (path string ,
@@ -49,18 +46,11 @@ type TLSConfig struct {
49
46
}
50
47
51
48
type hdfsAccessorImpl struct {
52
- Clock Clock // interface to get wall clock time
53
- NameNodeAddresses []string // array of Address:port string for the name nodes
54
- MetadataClient * hdfs.Client // HDFS client used for metadata operations
55
- MetadataClientMutex sync.Mutex // Serializing all metadata operations for simplicity (for now), TODO: allow N concurrent operations
56
- UserNameToUidCache map [string ]UGCacheEntry // cache for converting usernames to UIDs
57
- GroupNameToUidCache map [string ]UGCacheEntry // cache for converting usernames to UIDs
58
- TLSConfig TLSConfig // enable/disable using tls
59
- }
60
-
61
- type UGCacheEntry struct {
62
- ID uint32 // User/Group Id
63
- Expires time.Time // Absolute time when this cache entry expires
49
+ Clock Clock // interface to get wall clock time
50
+ NameNodeAddresses []string // array of Address:port string for the name nodes
51
+ MetadataClient * hdfs.Client // HDFS client used for metadata operations
52
+ MetadataClientMutex sync.Mutex // Serializing all metadata operations for simplicity (for now), TODO: allow N concurrent operations
53
+ TLSConfig TLSConfig // enable/disable using tls
64
54
}
65
55
66
56
var _ HdfsAccessor = (* hdfsAccessorImpl )(nil ) // ensure hdfsAccessorImpl implements HdfsAccessor
@@ -70,11 +60,9 @@ func NewHdfsAccessor(nameNodeAddresses string, clock Clock, tlsConfig TLSConfig)
70
60
nns := strings .Split (nameNodeAddresses , "," )
71
61
72
62
this := & hdfsAccessorImpl {
73
- NameNodeAddresses : nns ,
74
- Clock : clock ,
75
- UserNameToUidCache : make (map [string ]UGCacheEntry ),
76
- GroupNameToUidCache : make (map [string ]UGCacheEntry ),
77
- TLSConfig : tlsConfig ,
63
+ NameNodeAddresses : nns ,
64
+ Clock : clock ,
65
+ TLSConfig : tlsConfig ,
78
66
}
79
67
return this , nil
80
68
}
@@ -103,29 +91,29 @@ func (dfs *hdfsAccessorImpl) ConnectToNameNode() (*hdfs.Client, error) {
103
91
client , err := dfs .connectToNameNodeImpl ()
104
92
if err != nil {
105
93
// Connection failed
106
- return nil , errors . New ( fmt .Sprintf ( "Fail to connect to name node with error: %s" , err .Error () ))
94
+ return nil , fmt .Errorf ( "fail to connect to name node with error: %s" , err .Error ())
107
95
}
108
96
return client , nil
109
97
}
110
98
111
99
// Performs an attempt to connect to the HDFS name
112
100
func (dfs * hdfsAccessorImpl ) connectToNameNodeImpl () (* hdfs.Client , error ) {
113
- hadoopUser := os .Getenv ("HADOOP_USER_NAME" )
114
- if hadoopUser == "" {
115
- u , err := user .Current ()
101
+ if hadoopUserName == "" {
102
+ u , err := ugcache .CurrentUserName ()
116
103
if err != nil {
117
- return nil , fmt .Errorf ("Couldn 't determine user: %s" , err )
104
+ return nil , fmt .Errorf ("couldn 't determine user: %s" , err )
118
105
}
119
- hadoopUser = u . Username
106
+ hadoopUserName = u
120
107
}
121
- loginfo (fmt .Sprintf ("Connecting as user: %s" , hadoopUser ), nil )
108
+ hadoopUserID = ugcache .LookupUId (hadoopUserName )
109
+ loginfo (fmt .Sprintf ("Connecting as user: %s, UID: %d" , hadoopUserName , hadoopUserID ), nil )
122
110
123
111
// Performing an attempt to connect to the name node
124
112
// Colinmar's hdfs implementation has supported the multiple name node connection
125
113
hdfsOptions := hdfs.ClientOptions {
126
114
Addresses : dfs .NameNodeAddresses ,
127
115
TLS : dfs .TLSConfig .TLS ,
128
- User : hadoopUser ,
116
+ User : hadoopUserName ,
129
117
}
130
118
131
119
if dfs .TLSConfig .TLS {
@@ -278,11 +266,11 @@ func (dfs *hdfsAccessorImpl) AttrsFromFileInfo(fileInfo os.FileInfo) Attrs {
278
266
Name : fileInfo .Name (),
279
267
Mode : mode ,
280
268
Size : fi .Length (),
281
- Uid : dfs . LookupUid (fi .Owner ()),
269
+ Uid : ugcache . LookupUId (fi .Owner ()),
282
270
Mtime : modificationTime ,
283
271
Ctime : modificationTime ,
284
272
Crtime : modificationTime ,
285
- Gid : dfs .LookupGid (fi .OwnerGroup ())}
273
+ Gid : ugcache .LookupGid (fi .OwnerGroup ())}
286
274
}
287
275
288
276
func (dfs * hdfsAccessorImpl ) AttrsFromFsInfo (fsInfo hdfs.FsInfo ) FsInfo {
@@ -296,69 +284,6 @@ func HadoopTimestampToTime(timestamp uint64) time.Time {
296
284
return time .Unix (int64 (timestamp )/ 1000 , 0 )
297
285
}
298
286
299
- // Performs a cache-assisted lookup of UID by username
300
- func (dfs * hdfsAccessorImpl ) LookupUid (userName string ) uint32 {
301
- if userName == "" {
302
- return 0
303
- }
304
- // Note: this method is called under MetadataClientMutex, so accessing the cache dirctionary is safe
305
- cacheEntry , ok := dfs .UserNameToUidCache [userName ]
306
- if ok && dfs .Clock .Now ().Before (cacheEntry .Expires ) {
307
- return cacheEntry .ID
308
- }
309
-
310
- u , err := user .Lookup (userName )
311
- if u != nil {
312
- var uid64 uint64
313
- if err == nil {
314
- // UID is returned as string, need to parse it
315
- uid64 , err = strconv .ParseUint (u .Uid , 10 , 32 )
316
- }
317
- if err != nil {
318
- uid64 = (1 << 31 ) - 1
319
- }
320
- dfs .UserNameToUidCache [userName ] = UGCacheEntry {
321
- ID : uint32 (uid64 ),
322
- Expires : dfs .Clock .Now ().Add (UGCacheTime )}
323
- return uint32 (uid64 )
324
-
325
- } else {
326
- return 0
327
- }
328
- }
329
-
330
- // Performs a cache-assisted lookup of GID by grooupname
331
- func (dfs * hdfsAccessorImpl ) LookupGid (groupName string ) uint32 {
332
- if groupName == "" {
333
- return 0
334
- }
335
- // Note: this method is called under MetadataClientMutex, so accessing the cache dirctionary is safe
336
- cacheEntry , ok := dfs .GroupNameToUidCache [groupName ]
337
- if ok && dfs .Clock .Now ().Before (cacheEntry .Expires ) {
338
- return cacheEntry .ID
339
- }
340
-
341
- g , err := user .LookupGroup (groupName )
342
- if g != nil {
343
- var gid64 uint64
344
- if err == nil {
345
- // GID is returned as string, need to parse it
346
- gid64 , err = strconv .ParseUint (g .Gid , 10 , 32 )
347
- }
348
- if err != nil {
349
- gid64 = (1 << 31 ) - 1
350
- }
351
- dfs .GroupNameToUidCache [groupName ] = UGCacheEntry {
352
- ID : uint32 (gid64 ),
353
- Expires : dfs .Clock .Now ().Add (UGCacheTime )}
354
- return uint32 (gid64 )
355
-
356
- } else {
357
- logwarn (fmt .Sprintf ("Group not found %s" , groupName ), nil )
358
- return 0
359
- }
360
- }
361
-
362
287
// Returns true if err==nil or err is expected (benign) error which should be propagated directoy to the caller
363
288
func IsSuccessOrNonRetriableError (err error ) bool {
364
289
if err == nil {
0 commit comments