Skip to content

Commit 22d6326

Browse files
authored
[POSIX] Ensure data is durably written to disk (#619)
1 parent 0fb24e9 commit 22d6326

File tree

2 files changed

+189
-74
lines changed

2 files changed

+189
-74
lines changed

storage/posix/file_ops.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright 2025 The Tessera authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package posix
16+
17+
import (
18+
"errors"
19+
"fmt"
20+
"math/rand/v2"
21+
"os"
22+
"path/filepath"
23+
"strconv"
24+
"strings"
25+
"syscall"
26+
27+
"k8s.io/klog/v2"
28+
)
29+
30+
const (
31+
dirPerm = 0o755
32+
filePerm = 0o644
33+
)
34+
35+
// syncDir calls fsync on the provided path.
36+
//
37+
// This is intended to be used to sync directories in which we've just created new entries.
38+
func syncDir(d string) error {
39+
fd, err := os.Open(d)
40+
if err != nil {
41+
return fmt.Errorf("failed to open %q: %v", d, err)
42+
}
43+
44+
if err := fd.Sync(); err != nil {
45+
return fmt.Errorf("failed to sync %q: %v", d, err)
46+
}
47+
return fd.Close()
48+
}
49+
50+
// mkdirAll is a reimplementation of os.mkdirAll but where we fsync the parent directory/ies
51+
// we modify.
52+
func mkdirAll(name string, perm os.FileMode) (err error) {
53+
name = strings.TrimSuffix(name, string(filepath.Separator))
54+
if name == "" {
55+
return nil
56+
}
57+
58+
// Finally, check and create the dir if necessary.
59+
dir, _ := filepath.Split(name)
60+
di, err := os.Lstat(name)
61+
switch {
62+
case errors.Is(err, syscall.ENOENT):
63+
// We'll see an ENOENT if there's a problem with a non-existant path element, so
64+
// we'll recurse and create the parent directory if necessary.
65+
if dir != "" {
66+
if err := mkdirAll(dir, perm); err != nil {
67+
return err
68+
}
69+
}
70+
// Once we've successfully created the parent element(s), we can drop through and
71+
// create the final entry in the requested path.
72+
fallthrough
73+
case errors.Is(err, os.ErrNotExist):
74+
// We'll see ErrNotExist if the final entry in the requested path doesn't exist,
75+
// so we simply attempt to create it in here.
76+
if err := os.Mkdir(name, perm); err != nil {
77+
return fmt.Errorf("%q: %v", name, err)
78+
}
79+
// And be sure to sync the parent directory.
80+
return syncDir(dir)
81+
case err != nil:
82+
return fmt.Errorf("lstat %q: %v", name, err)
83+
case !di.IsDir():
84+
return fmt.Errorf("%s is not a directory", name)
85+
default:
86+
return nil
87+
}
88+
}
89+
90+
// createEx atomically creates a file at the given path containing the provided data, and syncs the
91+
// directory containing the newly created file.
92+
//
93+
// Returns an error if a file already exists at the specified location, or it's unable to fully write the
94+
// data & close the file.
95+
func createEx(name string, d []byte) error {
96+
dir, _ := filepath.Split(name)
97+
if err := mkdirAll(dir, dirPerm); err != nil {
98+
return fmt.Errorf("failed to make entries directory structure: %w", err)
99+
}
100+
101+
tmpName, err := createTemp(name, d)
102+
if err != nil {
103+
return fmt.Errorf("failed to create temp file: %v", err)
104+
}
105+
defer func() {
106+
if err := os.Remove(tmpName); err != nil {
107+
klog.Warningf("Failed to remove temporary file %q: %v", tmpName, err)
108+
}
109+
}()
110+
111+
if err := os.Link(tmpName, name); err != nil {
112+
// Wrap the error here because we need to know if it's os.ErrExists at higher levels.
113+
return fmt.Errorf("failed to link temporary file to target %q: %w", name, err)
114+
}
115+
116+
return syncDir(dir)
117+
}
118+
119+
// overwrite atomically creates/overwrites a file at the given path containing the provided data, and syncs
120+
// the directory containing the overwritten/created file.
121+
func overwrite(name string, d []byte) error {
122+
dir, _ := filepath.Split(name)
123+
if err := mkdirAll(dir, dirPerm); err != nil {
124+
return fmt.Errorf("failed to make entries directory structure: %w", err)
125+
}
126+
127+
tmpName, err := createTemp(name, d)
128+
if err != nil {
129+
return fmt.Errorf("failed to create temp file: %v", err)
130+
}
131+
132+
if err := os.Rename(tmpName, name); err != nil {
133+
return fmt.Errorf("failed to rename temporary file to target %q: %w", name, err)
134+
}
135+
136+
return syncDir(dir)
137+
}
138+
139+
// createTemp creates a new temporary file in the directory dir, with a name based on the provided prefix,
140+
// and writes the provided data to it.
141+
//
142+
// Multiple programs or goroutines calling CreateTemp simultaneously will not choose the same file.
143+
// It is the caller's responsibility to remove the file when it is no longer needed.
144+
//
145+
// Ths file data is written with O_SYNC, however the containing directory is NOT sync'd on the assumption
146+
// that this temporary file will be linked/renamed by the caller who will also sync the directory.
147+
func createTemp(prefix string, d []byte) (name string, err error) {
148+
try := 0
149+
var f *os.File
150+
151+
for {
152+
name = prefix + strconv.Itoa(int(rand.Int32()))
153+
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_SYNC, filePerm)
154+
if err == nil {
155+
break
156+
} else if os.IsExist(err) {
157+
if try++; try < 10000 {
158+
continue
159+
}
160+
return "", &os.PathError{Op: "createtemp", Path: prefix + "*", Err: os.ErrExist}
161+
}
162+
}
163+
164+
defer func() {
165+
if errC := f.Close(); errC != nil && err == nil {
166+
err = errC
167+
}
168+
}()
169+
170+
if n, err := f.Write(d); err != nil {
171+
return "", fmt.Errorf("failed to write to temporary file %q: %v", name, err)
172+
} else if l := len(d); n < l {
173+
return "", fmt.Errorf("short write on %q, %d < %d", name, n, l)
174+
}
175+
176+
return name, nil
177+
}

storage/posix/files.go

Lines changed: 12 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ const (
4545
// that were created before we introduced this.
4646
compatibilityVersion = 1
4747

48-
dirPerm = 0o755
49-
filePerm = 0o644
5048
stateDir = ".state"
5149

5250
minCheckpointInterval = time.Second
@@ -442,7 +440,7 @@ func (lrs *logResourceStorage) writeBundle(_ context.Context, index uint64, part
442440
// creating a zero-sized one if it doesn't already exist.
443441
func (a *appender) initialise(ctx context.Context) error {
444442
// Idempotent: If folder exists, nothing happens.
445-
if err := os.MkdirAll(filepath.Join(a.s.path, stateDir), dirPerm); err != nil {
443+
if err := mkdirAll(filepath.Join(a.s.path, stateDir), dirPerm); err != nil {
446444
return fmt.Errorf("failed to create log directory: %q", err)
447445
}
448446
// Double locking:
@@ -527,7 +525,7 @@ func (s *Storage) writeTreeState(size uint64, root []byte) error {
527525
return fmt.Errorf("error in Marshal: %v", err)
528526
}
529527

530-
if err := s.overwrite(filepath.Join(stateDir, "treeState"), raw); err != nil {
528+
if err := overwrite(filepath.Join(s.path, stateDir, "treeState"), raw); err != nil {
531529
return fmt.Errorf("failed to create private tree state file: %w", err)
532530
}
533531
return nil
@@ -583,9 +581,7 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStaleness time.Dura
583581
return fmt.Errorf("newCP: %v", err)
584582
}
585583

586-
// TODO(mhutchinson): grab witness signatures
587-
588-
if err := a.s.overwrite(layout.CheckpointPath, cpRaw); err != nil {
584+
if err := overwrite(filepath.Join(a.s.path, layout.CheckpointPath), cpRaw); err != nil {
589585
return fmt.Errorf("overwrite(%s): %v", layout.CheckpointPath, err)
590586
}
591587

@@ -599,8 +595,7 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStaleness time.Dura
599595
// It will error if a file already exists at the specified location, or it's unable to fully write the
600596
// data & close the file.
601597
func (s *Storage) createExclusive(p string, d []byte) error {
602-
p = filepath.Join(s.path, p)
603-
return createEx(p, d)
598+
return createEx(filepath.Join(s.path, p), d)
604599
}
605600

606601
func (s *Storage) readAll(p string) ([]byte, error) {
@@ -615,84 +610,27 @@ func (s *Storage) readAll(p string) ([]byte, error) {
615610
func (s *Storage) createIdempotent(p string, d []byte) error {
616611
if err := s.createExclusive(p, d); err != nil {
617612
if errors.Is(err, os.ErrExist) {
618-
if r, err := s.readAll(p); err != nil {
613+
r, err := s.readAll(p)
614+
if err != nil {
619615
return fmt.Errorf("file %q already exists, but unable to read it: %v", p, err)
620-
} else if bytes.Equal(d, r) {
621-
// Idempotent write
622-
return nil
623616
}
617+
if !bytes.Equal(d, r) {
618+
return fmt.Errorf("file %q already exists but has different contents", p)
619+
}
620+
// Idempotent write.
621+
return nil
624622
}
625623
return err
626624
}
627625
return nil
628626
}
629627

630-
// overwrite atomically overwrites the specified file relative to the root of the log (or creates it if it doesn't exist)
631-
// with the provided data.
632-
func (s *Storage) overwrite(p string, d []byte) error {
633-
p = filepath.Join(s.path, p)
634-
tmpN := p + ".tmp"
635-
if err := os.WriteFile(tmpN, d, filePerm); err != nil {
636-
return fmt.Errorf("failed to write temp file %q: %v", tmpN, err)
637-
}
638-
if err := os.Rename(tmpN, p); err != nil {
639-
_ = os.Remove(tmpN)
640-
return fmt.Errorf("failed to move temp file into target location %q: %v", p, err)
641-
}
642-
return nil
643-
}
644-
645628
// stat returns os.Stat info for the speficied file relative to the log root.
646629
func (s *Storage) stat(p string) (os.FileInfo, error) {
647630
p = filepath.Join(s.path, p)
648631
return os.Stat(p)
649632
}
650633

651-
// createEx atomically creates a file at the given path containing the provided data.
652-
//
653-
// It will error if a file already exists at the specified location, or it's unable to fully write the
654-
// data & close the file.
655-
func createEx(p string, d []byte) error {
656-
dir, f := filepath.Split(p)
657-
if err := os.MkdirAll(dir, dirPerm); err != nil {
658-
return fmt.Errorf("failed to make entries directory structure: %w", err)
659-
}
660-
tmpF, err := os.CreateTemp(dir, f+"-*")
661-
if err != nil {
662-
return fmt.Errorf("failed to create temp file: %v", err)
663-
}
664-
if err := tmpF.Chmod(filePerm); err != nil {
665-
return fmt.Errorf("failed to chmod temp file: %v", err)
666-
}
667-
tmpName := tmpF.Name()
668-
defer func() {
669-
if tmpF != nil {
670-
if err := tmpF.Close(); err != nil {
671-
klog.Warningf("Failed to close temporary file: %v", err)
672-
}
673-
}
674-
if err := os.Remove(tmpName); err != nil {
675-
klog.Warningf("Failed to remove temporary file %q: %v", tmpName, err)
676-
}
677-
}()
678-
679-
if n, err := tmpF.Write(d); err != nil {
680-
return fmt.Errorf("unable to write data to temporary file: %v", err)
681-
} else if l := len(d); n != l {
682-
return fmt.Errorf("short write (%d < %d byte) on temporary file", n, l)
683-
}
684-
if err := tmpF.Close(); err != nil {
685-
return fmt.Errorf("failed to close temporary file: %v", err)
686-
}
687-
tmpF = nil
688-
if err := os.Link(tmpName, p); err != nil {
689-
// Wrap the error here because we need to know if it's os.ErrExists at higher levels.
690-
return fmt.Errorf("failed to link temporary file to target %q: %w", p, err)
691-
}
692-
693-
return nil
694-
}
695-
696634
// MigrationWriter creates a new POSIX storage for the MigrationTarget lifecycle mode.
697635
func (s *Storage) MigrationWriter(ctx context.Context, opts *tessera.MigrationOptions) (tessera.MigrationWriter, tessera.LogReader, error) {
698636
r := &MigrationStorage{
@@ -743,7 +681,7 @@ func (m *MigrationStorage) AwaitIntegration(ctx context.Context, sourceSize uint
743681

744682
func (m *MigrationStorage) initialise() error {
745683
// Idempotent: If folder exists, nothing happens.
746-
if err := os.MkdirAll(filepath.Join(m.s.path, stateDir), dirPerm); err != nil {
684+
if err := mkdirAll(filepath.Join(m.s.path, stateDir), dirPerm); err != nil {
747685
return fmt.Errorf("failed to create log directory: %q", err)
748686
}
749687
// Double locking:

0 commit comments

Comments
 (0)