@@ -15,32 +15,35 @@ limitations under the License.
15
15
*/
16
16
17
17
import (
18
- "encoding/json "
18
+ "errors "
19
19
"fmt"
20
20
"strconv"
21
21
"sync"
22
22
"time"
23
23
24
24
"github.com/container-storage-interface/spec/lib/go/csi"
25
- "github.com/linode/linodego"
26
25
"golang.org/x/net/context"
26
+ "google.golang.org/grpc/codes"
27
+ "google.golang.org/grpc/status"
27
28
"k8s.io/mount-utils"
28
29
29
30
devicemanager "github.com/linode/linode-blockstorage-csi-driver/pkg/device-manager"
30
31
filesystem "github.com/linode/linode-blockstorage-csi-driver/pkg/filesystem"
31
32
linodeclient "github.com/linode/linode-blockstorage-csi-driver/pkg/linode-client"
32
33
linodevolumes "github.com/linode/linode-blockstorage-csi-driver/pkg/linode-volumes"
33
34
"github.com/linode/linode-blockstorage-csi-driver/pkg/logger"
35
+ mountmanager "github.com/linode/linode-blockstorage-csi-driver/pkg/mount-manager"
34
36
"github.com/linode/linode-blockstorage-csi-driver/pkg/observability"
35
37
)
36
38
37
39
type NodeServer struct {
38
40
driver * LinodeDriver
39
- mounter * mount .SafeFormatAndMount
41
+ mounter * mountmanager .SafeFormatAndMount
40
42
deviceutils devicemanager.DeviceUtils
41
43
client linodeclient.LinodeClient
42
44
metadata Metadata
43
45
encrypt Encryption
46
+ resizeFs mountmanager.ResizeFSer
44
47
// TODO: Only lock mutually exclusive calls and make locking more fine grained
45
48
mux sync.Mutex
46
49
@@ -49,7 +52,7 @@ type NodeServer struct {
49
52
50
53
var _ csi.NodeServer = & NodeServer {}
51
54
52
- func NewNodeServer (ctx context.Context , linodeDriver * LinodeDriver , mounter * mount .SafeFormatAndMount , deviceUtils devicemanager.DeviceUtils , client linodeclient.LinodeClient , metadata Metadata , encrypt Encryption ) (* NodeServer , error ) {
55
+ func NewNodeServer (ctx context.Context , linodeDriver * LinodeDriver , mounter * mountmanager .SafeFormatAndMount , deviceUtils devicemanager.DeviceUtils , client linodeclient.LinodeClient , metadata Metadata , encrypt Encryption , resize mountmanager. ResizeFSer ) (* NodeServer , error ) {
53
56
log := logger .GetLogger (ctx )
54
57
55
58
log .V (4 ).Info ("Creating new NodeServer" )
@@ -78,6 +81,7 @@ func NewNodeServer(ctx context.Context, linodeDriver *LinodeDriver, mounter *mou
78
81
client : client ,
79
82
metadata : metadata ,
80
83
encrypt : encrypt ,
84
+ resizeFs : resize ,
81
85
}
82
86
83
87
log .V (4 ).Info ("NodeServer created successfully" )
@@ -199,25 +203,39 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
199
203
ns .mux .Lock ()
200
204
defer ns .mux .Unlock ()
201
205
206
+ // Part 1: Validate request object
207
+
202
208
// Before to functionStartTime, validate the request object (NodeStageVolumeRequest)
203
209
log .V (4 ).Info ("Validating request" , "volumeID" , volumeID )
204
210
if err := validateNodeStageVolumeRequest (ctx , req ); err != nil {
205
211
observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
206
- return nil , err
212
+ return nil , status .Error (codes .InvalidArgument , err .Error ())
213
+ }
214
+
215
+ // Part 2: Get information of attached device
216
+
217
+ readonly , err := getReadOnlyFromCapability (req .GetVolumeCapability ())
218
+ if err != nil {
219
+ observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
220
+ return nil , errors .Join (errInternal ("failed to get readonly from volume capability: %v" , err ), err )
207
221
}
208
222
223
+ stagingTargetPath := req .GetStagingTargetPath ()
224
+
209
225
// Get the LinodeVolumeKey which we need to find the device path
210
226
LinodeVolumeKey , err := linodevolumes .ParseLinodeVolumeKey (volumeID )
211
227
if err != nil {
212
228
observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
213
- return nil , err
229
+ return nil , errors . Join ( status . Errorf ( codes . InvalidArgument , "volume not found: %v" , err ), err )
214
230
}
215
231
216
232
// Get device path of attached device
217
233
partition := ""
218
234
219
- if part , ok := req .GetVolumeContext ()["partition" ]; ok {
220
- partition = part
235
+ if vc := req .GetVolumeContext (); vc != nil {
236
+ if part , ok := vc ["partition" ]; ok {
237
+ partition = part
238
+ }
221
239
}
222
240
223
241
log .V (4 ).Info ("Finding device path" , "volumeID" , volumeID )
@@ -227,9 +245,10 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
227
245
return nil , err
228
246
}
229
247
230
- // Check if staging target path is a valid mount point.
231
- log .V (4 ).Info ("Ensuring staging target path is a valid mount point" , "volumeID" , volumeID , "stagingTargetPath" , req .GetStagingTargetPath ())
232
- notMnt , err := ns .ensureMountPoint (ctx , req .GetStagingTargetPath (), filesystem .NewFileSystem ())
248
+ // Part 3: check if staging target path is a valid mount point.
249
+
250
+ log .V (4 ).Info ("Ensuring staging target path is a valid mount point" , "volumeID" , volumeID , "stagingTargetPath" , stagingTargetPath )
251
+ notMnt , err := ns .ensureMountPoint (ctx , stagingTargetPath , filesystem .NewFileSystem ())
233
252
if err != nil {
234
253
observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
235
254
return nil , err
@@ -244,7 +263,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
244
263
245
264
*/
246
265
observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
247
- log .V (4 ).Info ("Staging target path is already a mount point" , "volumeID" , volumeID , "stagingTargetPath" , req . GetStagingTargetPath () )
266
+ log .V (4 ).Info ("Staging target path is already a mount point" , "volumeID" , volumeID , "stagingTargetPath" , stagingTargetPath )
248
267
return & csi.NodeStageVolumeResponse {}, nil
249
268
}
250
269
@@ -256,14 +275,27 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
256
275
return & csi.NodeStageVolumeResponse {}, nil
257
276
}
258
277
259
- // Mount device to stagingTargetPath
260
- // If LUKS is enabled, format the device accordingly
261
- log .V (4 ).Info ("Mounting device" , "volumeID" , volumeID , "devicePath" , devicePath , "stagingTargetPath" , req . GetStagingTargetPath () )
278
+ // Part 4: Mount device and format if needed
279
+
280
+ log .V (4 ).Info ("Mounting device" , "volumeID" , volumeID , "devicePath" , devicePath , "stagingTargetPath" , stagingTargetPath )
262
281
if err := ns .mountVolume (ctx , devicePath , req ); err != nil {
263
282
observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
264
283
return nil , err
265
284
}
266
285
286
+ // Part 5: Resize fs
287
+
288
+ if ! readonly {
289
+ resized , err := ns .resize (devicePath , stagingTargetPath )
290
+ if err != nil {
291
+ observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Failed , functionStartTime )
292
+ return nil , errInternal ("failed to resize volume %s: %v" , volumeID , err )
293
+ }
294
+ if resized {
295
+ log .V (4 ).Info ("Successfully resized volume" , "volumeID" , volumeID )
296
+ }
297
+ }
298
+
267
299
// Record functionStatus metric
268
300
observability .RecordMetrics (observability .NodeStageVolumeTotal , observability .NodeStageVolumeDuration , observability .Completed , functionStartTime )
269
301
@@ -317,37 +349,72 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
317
349
defer done ()
318
350
319
351
functionStartTime := time .Now ()
320
- volumeID := req .GetVolumeId ()
321
- log .V (2 ).Info ("Processing request" , "volumeID" , volumeID )
322
352
353
+ volumeID := req .GetVolumeId ()
323
354
// Validate req (NodeExpandVolumeRequest)
355
+
324
356
log .V (4 ).Info ("Validating request" , "volumeID" , volumeID )
325
357
if err := validateNodeExpandVolumeRequest (ctx , req ); err != nil {
326
358
observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Failed , functionStartTime )
327
- return nil , err
359
+ return nil , errors . Join ( status . Error ( codes . InvalidArgument , fmt . Sprintf ( "validation failed: %v" , err )), err )
328
360
}
329
361
330
- // Check linode to see if a give volume exists by volume ID
331
- // Make call to linode api using the linode api client
362
+ log . V ( 2 ). Info ( "Processing request" , "volumeID" , volumeID )
363
+
332
364
LinodeVolumeKey , err := linodevolumes .ParseLinodeVolumeKey (volumeID )
333
365
log .V (4 ).Info ("Processed LinodeVolumeKey" , "LinodeVolumeKey" , LinodeVolumeKey )
334
366
if err != nil {
335
- // Node volume expansion is not supported yet. To meet the spec, we need to implement this.
336
- // For now, we'll return a not found error.
337
-
338
367
observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Failed , functionStartTime )
339
- return nil , errNotFound ("volume not found: %v" , err )
368
+ return nil , errors .Join (status .Errorf (codes .NotFound , "volume not found: %v" , err ), err )
369
+ }
370
+
371
+ // We have no context for the partition, so we'll leave it empty
372
+ partition := ""
373
+
374
+ volumePath := req .GetVolumePath ()
375
+ if volumePath == "" {
376
+ observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Completed , functionStartTime )
377
+ return nil , status .Error (codes .InvalidArgument , "volume path must be provided" )
378
+ }
379
+
380
+ volumeCapability := req .GetVolumeCapability ()
381
+ // VolumeCapability is optional, if specified, use that as source of truth
382
+ if volumeCapability != nil {
383
+ if blk := volumeCapability .GetBlock (); blk != nil {
384
+ // Noop for Block NodeExpandVolume
385
+ log .V (4 ).Info ("NodeExpandVolume: called. Since it is a block device, ignoring..." , "volumeID" , volumeID , "volumePath" , volumePath )
386
+ observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Completed , functionStartTime )
387
+ return & csi.NodeExpandVolumeResponse {}, nil
388
+ }
389
+
390
+ readonly , err := getReadOnlyFromCapability (volumeCapability )
391
+ if err != nil {
392
+ observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Failed , functionStartTime )
393
+ return nil , errInternal ("failed to check if capability for volume %s is readonly: %v" , volumeID , err )
394
+ }
395
+ if readonly {
396
+ log .V (4 ).Info ("NodeExpandVolume succeeded" , "volumeID" , volumeID , "volumePath" , volumePath )
397
+ observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Completed , functionStartTime )
398
+ return & csi.NodeExpandVolumeResponse {}, nil
399
+ }
340
400
}
341
- jsonFilter , err := json .Marshal (map [string ]string {"label" : LinodeVolumeKey .Label })
401
+
402
+ devicePath , err := ns .findDevicePath (ctx , * LinodeVolumeKey , partition )
342
403
if err != nil {
343
404
observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Failed , functionStartTime )
344
- return nil , errInternal ( "marshal json filter: %v" , err )
405
+ return nil , err
345
406
}
346
407
347
- log .V (4 ).Info ("Listing volumes" , "volumeID" , volumeID )
348
- if _ , err = ns .client .ListVolumes (ctx , linodego .NewListOptions (0 , string (jsonFilter ))); err != nil {
408
+ // Resize the volume
409
+
410
+ resized , err := ns .resize (devicePath , volumePath )
411
+ if err != nil {
349
412
observability .RecordMetrics (observability .NodeExpandTotal , observability .NodeExpandDuration , observability .Failed , functionStartTime )
350
- return nil , errVolumeNotFound (LinodeVolumeKey .VolumeID )
413
+ return nil , errInternal ("failed to resize volume %s: %v" , volumeID , err )
414
+ }
415
+
416
+ if resized {
417
+ log .V (4 ).Info ("Successfully resized volume" , "volumeID" , volumeID )
351
418
}
352
419
353
420
// Record functionStatus metric
0 commit comments