Skip to content

Commit aacdf9b

Browse files
committed
fix: improve restic pkg's output handling and buffering
1 parent 66d63c1 commit aacdf9b

File tree

7 files changed

+120
-89
lines changed

7 files changed

+120
-89
lines changed

go.mod

+9-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ require (
66
connectrpc.com/connect v1.16.0
77
github.com/alessio/shellescape v1.4.2
88
github.com/containrrr/shoutrrr v0.8.0
9+
github.com/djherbis/buffer v1.2.0
10+
github.com/djherbis/nio/v3 v3.0.1
911
github.com/gitploy-io/cronexpr v0.2.2
1012
github.com/golang-jwt/jwt/v5 v5.2.1
1113
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
@@ -15,11 +17,11 @@ require (
1517
github.com/natefinch/atomic v1.0.1
1618
go.etcd.io/bbolt v1.3.9
1719
go.uber.org/zap v1.27.0
18-
golang.org/x/crypto v0.21.0
19-
golang.org/x/net v0.22.0
20-
golang.org/x/sync v0.6.0
21-
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda
22-
google.golang.org/grpc v1.62.1
20+
golang.org/x/crypto v0.22.0
21+
golang.org/x/net v0.24.0
22+
golang.org/x/sync v0.7.0
23+
google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56
24+
google.golang.org/grpc v1.63.2
2325
google.golang.org/protobuf v1.33.0
2426
)
2527

@@ -30,8 +32,8 @@ require (
3032
github.com/mattn/go-isatty v0.0.20 // indirect
3133
github.com/stretchr/testify v1.8.4 // indirect
3234
go.uber.org/multierr v1.11.0 // indirect
33-
golang.org/x/sys v0.18.0 // indirect
35+
golang.org/x/sys v0.19.0 // indirect
3436
golang.org/x/text v0.14.0 // indirect
3537
golang.org/x/tools v0.19.0 // indirect
36-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
38+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 // indirect
3739
)

go.sum

+18
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ github.com/containrrr/shoutrrr v0.8.0 h1:mfG2ATzIS7NR2Ec6XL+xyoHzN97H8WPjir8aYzJ
66
github.com/containrrr/shoutrrr v0.8.0/go.mod h1:ioyQAyu1LJY6sILuNyKaQaw+9Ttik5QePU8atnAdO2o=
77
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
88
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9+
github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ=
10+
github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE=
11+
github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4=
12+
github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg=
913
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
1014
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
1115
github.com/gitploy-io/cronexpr v0.2.2 h1:Au+wK6FqmOLAF7AkW6q4gnrNXTe3rEW97XFZ4chy0xs=
@@ -58,25 +62,39 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
5862
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
5963
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
6064
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
65+
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
66+
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
6167
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
6268
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
69+
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
70+
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
6371
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
6472
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
73+
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
74+
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
6575
golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
6676
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
6777
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
6878
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
6979
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
80+
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
81+
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
7082
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
7183
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
7284
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
7385
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
7486
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda h1:b6F6WIV4xHHD0FA4oIyzU6mHWg2WI2X1RBehwa5QN38=
7587
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda/go.mod h1:AHcE/gZH76Bk/ROZhQphlRoWo5xKDEtz3eVEO1LfA8c=
88+
google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56 h1:KuFzeG+qPmpT8KpJXcrKAyeHhn64dgEICWlccP9qp0U=
89+
google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56/go.mod h1:wTHjrkbcS8AoQbb/0v9bFIPItZQPAsyVfgG9YPUhjAM=
7690
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4=
7791
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
92+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk=
93+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
7894
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
7995
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
96+
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
97+
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
8098
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
8199
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
82100
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

pkg/restic/outputs.go

+4-22
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@ package restic
22

33
import (
44
"bufio"
5-
"context"
65
"encoding/json"
76
"errors"
87
"fmt"
98
"io"
10-
"os/exec"
11-
"slices"
129
"time"
1310

1411
v1 "github.com/garethgeorge/backrest/gen/go/v1"
@@ -95,7 +92,7 @@ func (b *BackupProgressEntry) Validate() error {
9592
}
9693

9794
// readBackupProgressEntries returns the summary event or an error if the command failed.
98-
func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Reader, callback func(event *BackupProgressEntry)) (*BackupProgressEntry, error) {
95+
func readBackupProgressEntries(output io.Reader, callback func(event *BackupProgressEntry)) (*BackupProgressEntry, error) {
9996
scanner := bufio.NewScanner(output)
10097
scanner.Split(bufio.ScanLines)
10198

@@ -104,14 +101,8 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea
104101
// first event is handled specially to detect non-JSON output and fast-path out.
105102
if scanner.Scan() {
106103
var event BackupProgressEntry
107-
108104
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
109-
var bytes = slices.Clone(scanner.Bytes())
110-
for scanner.Scan() {
111-
bytes = append(bytes, scanner.Bytes()...)
112-
}
113-
114-
return nil, newCmdError(ctx, cmd, string(bytes), fmt.Errorf("command output was not JSON: %w", err))
105+
return nil, fmt.Errorf("command output was not JSON: %w", err)
115106
}
116107
if err := event.Validate(); err != nil {
117108
return nil, err
@@ -135,23 +126,19 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea
135126
// skip it. This is a best-effort attempt to parse the output.
136127
continue
137128
}
138-
139129
if callback != nil {
140130
callback(&event)
141131
}
142132
if event.MessageType == "summary" {
143133
summary = &event
144134
}
145135
}
146-
147136
if err := scanner.Err(); err != nil {
148137
return summary, fmt.Errorf("scanner encountered error: %w", err)
149138
}
150-
151139
if summary == nil {
152140
return nil, fmt.Errorf("no summary event found")
153141
}
154-
155142
return summary, nil
156143
}
157144

@@ -244,7 +231,7 @@ func (e *RestoreProgressEntry) Validate() error {
244231
}
245232

246233
// readRestoreProgressEntries returns the summary event or an error if the command failed.
247-
func readRestoreProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Reader, callback func(event *RestoreProgressEntry)) (*RestoreProgressEntry, error) {
234+
func readRestoreProgressEntries(output io.Reader, callback func(event *RestoreProgressEntry)) (*RestoreProgressEntry, error) {
248235
scanner := bufio.NewScanner(output)
249236
scanner.Split(bufio.ScanLines)
250237

@@ -255,12 +242,7 @@ func readRestoreProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Re
255242
var event RestoreProgressEntry
256243

257244
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
258-
var bytes = slices.Clone(scanner.Bytes())
259-
for scanner.Scan() {
260-
bytes = append(bytes, scanner.Bytes()...)
261-
}
262-
263-
return nil, newCmdError(ctx, cmd, string(bytes), fmt.Errorf("command output was not JSON: %w", err))
245+
return nil, fmt.Errorf("command output was not JSON: %w", err)
264246
}
265247
if err := event.Validate(); err != nil {
266248
return nil, err

pkg/restic/outputs_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package restic
22

33
import (
44
"bytes"
5-
"context"
6-
"os/exec"
75
"testing"
86
)
97

@@ -14,7 +12,7 @@ func TestReadBackupProgressEntries(t *testing.T) {
1412

1513
b := bytes.NewBuffer([]byte(testInput))
1614

17-
summary, err := readBackupProgressEntries(context.Background(), &exec.Cmd{}, b, func(event *BackupProgressEntry) {
15+
summary, err := readBackupProgressEntries(b, func(event *BackupProgressEntry) {
1816
t.Logf("event: %v", event)
1917
})
2018
if err != nil {

pkg/restic/restic.go

+39-56
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"strings"
1414
"sync"
1515

16+
"github.com/djherbis/buffer"
17+
nio "github.com/djherbis/nio/v3"
1618
"github.com/garethgeorge/backrest/internal/ioutil"
1719
)
1820

@@ -120,58 +122,42 @@ func (r *Repo) Backup(ctx context.Context, paths []string, progressCallback func
120122
args = append(args, paths...)
121123

122124
cmd := r.commandWithContext(ctx, args, opts...)
123-
capture := ioutil.NewOutputCapturer(outputBufferLimit)
124-
reader, writer := io.Pipe()
125-
r.pipeCmdOutputToWriter(cmd, writer, capture)
126-
127-
if err := cmd.Start(); err != nil {
128-
return nil, newCmdError(ctx, cmd, "", err)
129-
}
125+
fullOutput := ioutil.NewOutputCapturer(outputBufferLimit)
126+
buf := buffer.New(32 * 1024) // 32KB IO buffer for the realtime event parsing
127+
reader, writer := nio.Pipe(buf)
128+
r.pipeCmdOutputToWriter(cmd, fullOutput, writer)
130129

131-
var wg sync.WaitGroup
132-
var summary *BackupProgressEntry
133-
var cmdErr error
134130
var readErr error
135-
131+
var summary *BackupProgressEntry
132+
var wg sync.WaitGroup
136133
wg.Add(1)
137134
go func() {
138135
defer wg.Done()
139136
var err error
140-
summary, err = readBackupProgressEntries(ctx, cmd, reader, progressCallback)
137+
summary, err = readBackupProgressEntries(reader, progressCallback)
141138
if err != nil {
142139
readErr = fmt.Errorf("processing command output: %w", err)
140+
_ = cmd.Cancel() // cancel the command to prevent it from hanging now that we're not reading from it.
143141
}
144142
}()
145143

146-
wg.Add(1)
147-
go func() {
148-
defer writer.Close()
149-
defer wg.Done()
150-
if err := cmd.Wait(); err != nil {
144+
cmdErr := cmd.Run()
145+
writer.Close()
146+
wg.Wait()
147+
148+
if cmdErr != nil || readErr != nil {
149+
if cmdErr != nil {
151150
var exitErr *exec.ExitError
152-
if errors.As(err, &exitErr) {
151+
if errors.As(cmdErr, &exitErr) {
153152
if exitErr.ExitCode() == 3 {
154153
cmdErr = ErrPartialBackup
155154
} else {
156-
cmdErr = fmt.Errorf("exit code %v: %w", exitErr.ExitCode(), ErrBackupFailed)
155+
cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed)
157156
}
158-
return
159157
}
160-
cmdErr = err
161158
}
162-
}()
163-
164-
wg.Wait()
165-
166-
if logger := LoggerFromContext(ctx); logger != nil && summary != nil {
167-
bytes, _ := json.MarshalIndent(summary, "", " ")
168-
logger.Write(bytes)
159+
return summary, newCmdErrorPreformatted(ctx, cmd, string(fullOutput.Bytes()), errors.Join(cmdErr, readErr))
169160
}
170-
171-
if cmdErr != nil || readErr != nil {
172-
return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr))
173-
}
174-
175161
return summary, nil
176162
}
177163

@@ -251,44 +237,41 @@ func (r *Repo) Prune(ctx context.Context, pruneOutput io.Writer, opts ...Generic
251237

252238
func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*RestoreProgressEntry), opts ...GenericOption) (*RestoreProgressEntry, error) {
253239
cmd := r.commandWithContext(ctx, []string{"restore", "--json", snapshot}, opts...)
254-
output := ioutil.NewOutputCapturer(outputBufferLimit)
240+
capture := ioutil.NewOutputCapturer(outputBufferLimit) // for error reporting.
255241
reader, writer := io.Pipe()
256-
r.pipeCmdOutputToWriter(cmd, output, writer)
257-
258-
if err := cmd.Start(); err != nil {
259-
return nil, newCmdError(ctx, cmd, "", err)
260-
}
242+
r.pipeCmdOutputToWriter(cmd, writer, capture)
261243

262-
var wg sync.WaitGroup
263-
var summary *RestoreProgressEntry
264-
var cmdErr error
265244
var readErr error
266-
245+
var summary *RestoreProgressEntry
246+
var wg sync.WaitGroup
267247
wg.Add(1)
268248
go func() {
269249
defer wg.Done()
270250
var err error
271-
summary, err = readRestoreProgressEntries(ctx, cmd, reader, callback)
251+
summary, err = readRestoreProgressEntries(reader, callback)
272252
if err != nil {
273253
readErr = fmt.Errorf("processing command output: %w", err)
254+
_ = cmd.Cancel() // cancel the command to prevent it from hanging now that we're not reading from it.
274255
}
275256
}()
276257

277-
wg.Add(1)
278-
go func() {
279-
defer writer.Close()
280-
defer wg.Done()
281-
if err := cmd.Wait(); err != nil {
282-
cmdErr = err
283-
}
284-
}()
285-
258+
cmdErr := cmd.Run()
259+
writer.Close()
286260
wg.Wait()
287-
288261
if cmdErr != nil || readErr != nil {
289-
return nil, newCmdErrorPreformatted(ctx, cmd, string(output.Bytes()), errors.Join(cmdErr, readErr))
290-
}
262+
if cmdErr != nil {
263+
var exitErr *exec.ExitError
264+
if errors.As(cmdErr, &exitErr) {
265+
if exitErr.ExitCode() == 3 {
266+
cmdErr = ErrPartialBackup
267+
} else {
268+
cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed)
269+
}
270+
}
271+
}
291272

273+
return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr))
274+
}
292275
return summary, nil
293276
}
294277

0 commit comments

Comments
 (0)