Skip to content

Commit 5dbf9e3

Browse files
committed
refactor: implement volume mount controller
Fixes #9602 Aggregate incoming volume mount requests, reconcile them with volume status, perform actual mounting, and produce mount status. Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
1 parent aa11e9a commit 5dbf9e3

File tree

25 files changed

+3321
-561
lines changed

25 files changed

+3321
-561
lines changed

api/resource/definitions/block/block.proto

+35
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,29 @@ message LocatorSpec {
111111
google.api.expr.v1alpha1.CheckedExpr match = 1;
112112
}
113113

114+
// MountRequestSpec is the spec for MountRequest.
115+
message MountRequestSpec {
116+
string volume_id = 1;
117+
string parent_mount_id = 2;
118+
repeated string requesters = 3;
119+
repeated string requester_i_ds = 4;
120+
bool read_only = 5;
121+
}
122+
114123
// MountSpec is the spec for volume mount.
115124
message MountSpec {
116125
string target_path = 1;
117126
string selinux_label = 2;
127+
repeated string options = 3;
128+
}
129+
130+
// MountStatusSpec is the spec for MountStatus.
131+
message MountStatusSpec {
132+
MountRequestSpec spec = 1;
133+
string target = 2;
134+
string source = 3;
135+
talos.resource.definitions.enums.BlockFilesystemType filesystem = 4;
136+
bool read_only = 5;
118137
}
119138

120139
// PartitionSpec is the spec for volume partitioning.
@@ -160,6 +179,21 @@ message VolumeConfigSpec {
160179
EncryptionSpec encryption = 6;
161180
}
162181

182+
// VolumeMountRequestSpec is the spec for VolumeMountRequest.
183+
message VolumeMountRequestSpec {
184+
string volume_id = 1;
185+
string requester = 2;
186+
bool read_only = 3;
187+
}
188+
189+
// VolumeMountStatusSpec is the spec for VolumeMountStatus.
190+
message VolumeMountStatusSpec {
191+
string volume_id = 1;
192+
string requester = 2;
193+
string target = 3;
194+
bool read_only = 4;
195+
}
196+
163197
// VolumeStatusSpec is the spec for VolumeStatus resource.
164198
message VolumeStatusSpec {
165199
talos.resource.definitions.enums.BlockVolumePhase phase = 1;
@@ -176,5 +210,6 @@ message VolumeStatusSpec {
176210
talos.resource.definitions.enums.BlockEncryptionProviderType encryption_provider = 12;
177211
string pretty_size = 13;
178212
repeated string encryption_failed_syncs = 14;
213+
MountSpec mount_spec = 15;
179214
}
180215

internal/app/machined/pkg/controllers/block/internal/volumes/locate.go

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
//
2525
//nolint:gocyclo,cyclop
2626
func LocateAndProvision(ctx context.Context, logger *zap.Logger, volumeContext ManagerContext) error {
27+
volumeContext.Status.MountSpec = volumeContext.Cfg.TypedSpec().Mount
2728
volumeType := volumeContext.Cfg.TypedSpec().Type
2829

2930
if volumeType == block.VolumeTypeTmpfs {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package block
6+
7+
import (
8+
"context"
9+
"fmt"
10+
11+
"github.com/cosi-project/runtime/pkg/controller"
12+
"github.com/cosi-project/runtime/pkg/resource"
13+
"github.com/cosi-project/runtime/pkg/safe"
14+
"github.com/cosi-project/runtime/pkg/state"
15+
"github.com/siderolabs/gen/xslices"
16+
"go.uber.org/zap"
17+
18+
"github.com/siderolabs/talos/internal/pkg/mount/v2"
19+
"github.com/siderolabs/talos/pkg/machinery/resources/block"
20+
)
21+
22+
type mountContext struct {
23+
point *mount.Point
24+
readOnly bool
25+
unmounter func() error
26+
}
27+
28+
// MountController performs actual mount/unmount operations based on the MountRequests.
29+
type MountController struct {
30+
activeMounts map[string]mountContext
31+
}
32+
33+
// Name implements controller.Controller interface.
34+
func (ctrl *MountController) Name() string {
35+
return "block.MountController"
36+
}
37+
38+
// Inputs implements controller.Controller interface.
39+
func (ctrl *MountController) Inputs() []controller.Input {
40+
return []controller.Input{
41+
{
42+
Namespace: block.NamespaceName,
43+
Type: block.MountRequestType,
44+
Kind: controller.InputStrong,
45+
},
46+
{
47+
Namespace: block.NamespaceName,
48+
Type: block.VolumeStatusType,
49+
Kind: controller.InputStrong,
50+
},
51+
{
52+
Namespace: block.NamespaceName,
53+
Type: block.MountStatusType,
54+
Kind: controller.InputDestroyReady,
55+
},
56+
}
57+
}
58+
59+
// Outputs implements controller.Controller interface.
60+
func (ctrl *MountController) Outputs() []controller.Output {
61+
return []controller.Output{
62+
{
63+
Type: block.MountStatusType,
64+
Kind: controller.OutputExclusive,
65+
},
66+
}
67+
}
68+
69+
// Run implements controller.Controller interface.
70+
//
71+
//nolint:gocyclo,cyclop
72+
func (ctrl *MountController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
73+
if ctrl.activeMounts == nil {
74+
ctrl.activeMounts = map[string]mountContext{}
75+
}
76+
77+
for {
78+
select {
79+
case <-r.EventCh():
80+
case <-ctx.Done():
81+
return nil
82+
}
83+
84+
volumeStatuses, err := safe.ReaderListAll[*block.VolumeStatus](ctx, r)
85+
if err != nil {
86+
return fmt.Errorf("failed to read volume statuses: %w", err)
87+
}
88+
89+
volumeStatusMap := xslices.ToMap(
90+
safe.ToSlice(
91+
volumeStatuses,
92+
identity,
93+
),
94+
func(v *block.VolumeStatus) (string, *block.VolumeStatus) {
95+
return v.Metadata().ID(), v
96+
},
97+
)
98+
99+
mountStatuses, err := safe.ReaderListAll[*block.MountStatus](ctx, r)
100+
if err != nil {
101+
return fmt.Errorf("failed to read mount statuses: %w", err)
102+
}
103+
104+
mountStatusMap := xslices.ToMap(
105+
safe.ToSlice(
106+
mountStatuses,
107+
identity,
108+
),
109+
func(v *block.MountStatus) (string, *block.MountStatus) {
110+
return v.Metadata().ID(), v
111+
},
112+
)
113+
114+
mountRequests, err := safe.ReaderListAll[*block.MountRequest](ctx, r)
115+
if err != nil {
116+
return fmt.Errorf("failed to read mount requests: %w", err)
117+
}
118+
119+
for mountRequest := range mountRequests.All() {
120+
volumeStatus := volumeStatusMap[mountRequest.TypedSpec().VolumeID]
121+
volumeNotReady := volumeStatus == nil || volumeStatus.TypedSpec().Phase != block.VolumePhaseReady || volumeStatus.Metadata().Phase() != resource.PhaseRunning
122+
123+
mountRequestTearingDown := mountRequest.Metadata().Phase() == resource.PhaseTearingDown
124+
125+
mountStatus := mountStatusMap[mountRequest.Metadata().ID()]
126+
mountStatusTearingDown := mountStatus != nil && mountStatus.Metadata().Phase() == resource.PhaseTearingDown
127+
128+
if volumeNotReady || mountRequestTearingDown || mountStatusTearingDown {
129+
// we should tear down the mount in the following sequence:
130+
// 1. tear down & destroy MountStatus
131+
// 2. perform actual unmount
132+
// 3. remove finalizer from VolumeStatus
133+
// 4. remove finalizer from MountRequest
134+
mountStatusTornDown, err := ctrl.tearDownMountStatus(ctx, r, logger, mountRequest)
135+
if err != nil {
136+
return fmt.Errorf("error tearing down mount status %q: %w", mountRequest.Metadata().ID(), err)
137+
}
138+
139+
if !mountStatusTornDown {
140+
continue
141+
}
142+
143+
if mountCtx, ok := ctrl.activeMounts[mountRequest.Metadata().ID()]; ok {
144+
if err = mountCtx.unmounter(); err != nil {
145+
return fmt.Errorf("failed to unmount %q: %w", mountRequest.Metadata().ID(), err)
146+
}
147+
148+
delete(ctrl.activeMounts, mountRequest.Metadata().ID())
149+
150+
logger.Info("volume unmount",
151+
zap.String("volume", mountRequest.Metadata().ID()),
152+
zap.String("source", mountCtx.point.Source()),
153+
zap.String("target", mountCtx.point.Target()),
154+
zap.String("filesystem", mountCtx.point.FSType()),
155+
)
156+
}
157+
158+
if volumeStatus != nil && volumeStatus.Metadata().Finalizers().Has(ctrl.Name()) {
159+
if err = r.RemoveFinalizer(ctx, volumeStatus.Metadata(), ctrl.Name()); err != nil {
160+
return fmt.Errorf("failed to remove finalizer from volume status %q: %w", volumeStatus.Metadata().ID(), err)
161+
}
162+
}
163+
164+
if mountRequest.Metadata().Finalizers().Has(ctrl.Name()) {
165+
if err = r.RemoveFinalizer(ctx, mountRequest.Metadata(), ctrl.Name()); err != nil {
166+
return fmt.Errorf("failed to remove finalizer from mount request %q: %w", mountRequest.Metadata().ID(), err)
167+
}
168+
}
169+
}
170+
171+
if !(volumeNotReady || mountRequestTearingDown) {
172+
// we should perform mount operation in the following sequence:
173+
// 1. add finalizer on MountRequest
174+
// 2. add finalizer on VolumeStatus
175+
// 3. perform actual mount
176+
// 4. create MountStatus
177+
if !mountRequest.Metadata().Finalizers().Has(ctrl.Name()) {
178+
if err = r.AddFinalizer(ctx, mountRequest.Metadata(), ctrl.Name()); err != nil {
179+
return fmt.Errorf("failed to add finalizer to mount request %q: %w", mountRequest.Metadata().ID(), err)
180+
}
181+
}
182+
183+
if !volumeStatus.Metadata().Finalizers().Has(ctrl.Name()) {
184+
if err = r.AddFinalizer(ctx, volumeStatus.Metadata(), ctrl.Name()); err != nil {
185+
return fmt.Errorf("failed to add finalizer to volume status %q: %w", volumeStatus.Metadata().ID(), err)
186+
}
187+
}
188+
189+
mountSource := volumeStatus.TypedSpec().MountLocation
190+
mountTarget := volumeStatus.TypedSpec().MountSpec.TargetPath
191+
mountFilesystem := volumeStatus.TypedSpec().Filesystem
192+
193+
mountCtx, ok := ctrl.activeMounts[mountRequest.Metadata().ID()]
194+
195+
// mount hasn't been done yet
196+
if !ok {
197+
var opts []mount.NewPointOption
198+
199+
// [TODO]: need to support more mount options:
200+
// * proj quota (static)
201+
202+
opts = append(opts, mount.WithSelinuxLabel(volumeStatus.TypedSpec().MountSpec.SelinuxLabel))
203+
204+
if mountRequest.TypedSpec().ReadOnly {
205+
opts = append(opts, mount.WithReadonly())
206+
}
207+
208+
mountpoint := mount.NewPoint(
209+
mountSource,
210+
mountTarget,
211+
mountFilesystem.String(),
212+
opts...,
213+
)
214+
215+
unmounter, err := mountpoint.Mount(mount.WithMountPrinter(logger.Sugar().Infof))
216+
if err != nil {
217+
return fmt.Errorf("failed to mount %q: %w", mountRequest.Metadata().ID(), err)
218+
}
219+
220+
logger.Info("volume mount",
221+
zap.String("volume", volumeStatus.Metadata().ID()),
222+
zap.String("source", mountSource),
223+
zap.String("target", mountTarget),
224+
zap.Stringer("filesystem", mountFilesystem),
225+
)
226+
227+
ctrl.activeMounts[mountRequest.Metadata().ID()] = mountContext{
228+
point: mountpoint,
229+
readOnly: mountRequest.TypedSpec().ReadOnly,
230+
unmounter: unmounter,
231+
}
232+
} else if mountCtx.readOnly != mountRequest.TypedSpec().ReadOnly { // remount if needed
233+
switch mountRequest.TypedSpec().ReadOnly {
234+
case true:
235+
err = mountCtx.point.RemountReadOnly()
236+
case false:
237+
err = mountCtx.point.RemountReadWrite()
238+
}
239+
240+
if err != nil {
241+
return fmt.Errorf("failed to remount %q: %w", mountRequest.Metadata().ID(), err)
242+
}
243+
244+
mountCtx.readOnly = mountRequest.TypedSpec().ReadOnly
245+
}
246+
247+
if err = safe.WriterModify(
248+
ctx, r, block.NewMountStatus(block.NamespaceName, mountRequest.Metadata().ID()),
249+
func(mountStatus *block.MountStatus) error {
250+
mountStatus.TypedSpec().Spec = *mountRequest.TypedSpec()
251+
mountStatus.TypedSpec().Source = mountSource
252+
mountStatus.TypedSpec().Target = mountTarget
253+
mountStatus.TypedSpec().Filesystem = mountFilesystem
254+
mountStatus.TypedSpec().ReadOnly = mountRequest.TypedSpec().ReadOnly
255+
256+
return nil
257+
},
258+
); err != nil {
259+
return fmt.Errorf("failed to create mount status %q: %w", mountRequest.Metadata().ID(), err)
260+
}
261+
}
262+
}
263+
264+
r.ResetRestartBackoff()
265+
}
266+
}
267+
268+
func (ctrl *MountController) tearDownMountStatus(ctx context.Context, r controller.Runtime, logger *zap.Logger, mountRequest *block.MountRequest) (bool, error) {
269+
logger = logger.With(zap.String("mount_request", mountRequest.Metadata().ID()))
270+
271+
okToDestroy, err := r.Teardown(ctx, block.NewMountStatus(block.NamespaceName, mountRequest.Metadata().ID()).Metadata())
272+
if err != nil {
273+
if state.IsNotFoundError(err) {
274+
// no mount status, we are done
275+
return true, nil
276+
}
277+
278+
return false, fmt.Errorf("failed to teardown mount status %q: %w", mountRequest.Metadata().ID(), err)
279+
}
280+
281+
if !okToDestroy {
282+
logger.Debug("waiting for mount status to be torn down")
283+
284+
return false, nil
285+
}
286+
287+
err = r.Destroy(ctx, block.NewMountStatus(block.NamespaceName, mountRequest.Metadata().ID()).Metadata())
288+
if err != nil {
289+
return false, fmt.Errorf("failed to destroy mount status %q: %w", mountRequest.Metadata().ID(), err)
290+
}
291+
292+
logger.Info("mount status destroyed")
293+
294+
return true, nil
295+
}

0 commit comments

Comments
 (0)