-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtx.go
119 lines (90 loc) · 1.97 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package sql
import (
"context"
"database/sql"
"github.com/upfluence/errors"
)
var (
ErrRollback = errors.New("rollback sentinel")
InfiniteRetry = -1
defaultExecuteTxOptions = executeTxOptions{
retryCount: InfiniteRetry,
retryCheck: isRetryableError,
}
)
type IsolationLevel = sql.IsolationLevel
const (
LevelDefault IsolationLevel = iota
LevelReadUncommitted
LevelReadCommitted
LevelWriteCommitted
LevelRepeatableRead
LevelSnapshot
LevelSerializable
LevelLinearizable
)
type Tx interface {
Queryer
Commit() error
Rollback() error
}
type QueryerFunc func(Queryer) error
type executeTxOptions struct {
retryCount int
retryCheck func(error) bool
}
type ExecuteTxOption func(*executeTxOptions)
func (opts executeTxOptions) shouldRetry(i int) bool {
if opts.retryCount == InfiniteRetry {
return true
}
return i < opts.retryCount
}
func isRetryableError(err error) bool {
var re RollbackError
if !errors.As(err, &re) {
return false
}
return re.Type == SerializationFailure || re.Type == Locked
}
func WithCustomRetryCheck(fn func(error) bool) ExecuteTxOption {
return func(opts *executeTxOptions) { opts.retryCheck = fn }
}
func WithRetryCount(i int) ExecuteTxOption {
return func(opts *executeTxOptions) { opts.retryCount = i }
}
func ExecuteTx(ctx context.Context, db DB, txOpts TxOptions, fn QueryerFunc, exOpts ...ExecuteTxOption) error {
var (
i int
opts = defaultExecuteTxOptions
)
for _, fn := range exOpts {
fn(&opts)
}
for {
tx, err := db.BeginTx(ctx, txOpts)
if err != nil {
return errors.Wrap(err, "cant begin the tx")
}
switch err := fn(tx); {
case err == nil:
err := tx.Commit()
if !opts.retryCheck(err) || !opts.shouldRetry(i) {
return errors.Wrap(err, "cant commit the tx")
}
i++
case errors.Is(err, ErrRollback):
tx.Rollback()
return nil
case opts.retryCheck(err):
tx.Rollback()
if !opts.shouldRetry(i) {
return err
}
i++
default:
tx.Rollback()
return err
}
}
}