Skip to content

Commit 8ac3113

Browse files
authored
feat: added support for optimistic locking with version number (#19)
**Description** Added support for optimistic locking with version number for improved application concurrency behavior when using `PutItem` operation to create/update items. > Optimistic locking is a strategy to ensure that the client-side item that you are updating (or deleting) is the same as the item in Amazon DynamoDB. If you use this strategy, your database writes are protected from being overwritten by the writes of others, and vice versa. https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.OptimisticLocking.html **How it works** With optimistic locking, each item has an attribute that acts as a version number. If you retrieve an item from a table, the application records the version number of that item. You can update the item, but only if the version number on the server side has not changed. If there is a version mismatch, it means that someone else has modified the item before you did. The update attempt fails, because you have a stale version of the item. If this happens, try again by retrieving the item and then trying to update it. Optimistic locking prevents you from accidentally overwriting changes that were made by others. It also prevents others from accidentally overwriting your changes. _Clients needs to implement the retry behavior on their end for this to work. Simply retrying on this library to increment version number defeats the purpose of ensuring data integrity_ **Usage** Provide `WithOptimisticLock` option when calling PutItem method. ```go type LedgerAccount struct { ID string Balance int Version uint } func AddBalance(ctx context.Context, acc LedgerAccount, amount int) (err error) { var try int for try <= maxTries { var acc LedgerAccount err, _ := table.GetItem(ctx, pk, pk, &acc) if err != nil { return err } // Add amount to current account balance acc.Balance += amount // If another go routine updates account using AddBalance we want to avoid overwriting using an old balance err = table.PutItem(ctx, pk, pk, acc, dynago.WithOptimisticLock("Version", acc.Version)) if err == nil { return nil } // Retry if there is an error with latest item value from DynamoDB try += 1 } return err } ```
2 parents 3edd35e + 8701cc9 commit 8ac3113

File tree

4 files changed

+245
-6
lines changed

4 files changed

+245
-6
lines changed

README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,50 @@ err := table.PutItem(
8989
)
9090
```
9191

92+
#### Optimistic locking with version number
93+
94+
> Optimistic locking is a strategy to ensure that the client-side item that you are updating (or deleting) is the same as the item in Amazon DynamoDB.
95+
If you use this strategy, your database writes are protected from being overwritten by the writes of others, and vice versa.
96+
97+
Use the `WithOptimisticLock` option when calling `PutItem` method.
98+
99+
This works well and is recommended when using the event sourcing pattern where you need to update aggregate snapshots.
100+
You can make use of event broker retry mechanism or retry libraries to simplify retry
101+
102+
**Example**
103+
104+
```go
105+
type LedgerAccount struct {
106+
ID string
107+
Balance int
108+
Version uint
109+
}
110+
111+
func AddBalance(ctx context.Context, acc LedgerAccount, amount int) (err error) {
112+
var try int
113+
for try <= maxTries {
114+
var acc LedgerAccount
115+
err, _ := table.GetItem(ctx, pk, pk, &acc)
116+
if err != nil {
117+
return err
118+
}
119+
120+
// Add amount to current account balance
121+
acc.Balance += amount
122+
123+
// If another go routine updates account using AddBalance we want to avoid overwriting using an old balance
124+
err = table.PutItem(ctx, pk, pk, acc, dynago.WithOptimisticLock("Version", acc.Version))
125+
if err == nil {
126+
return nil
127+
}
128+
// Retry if there is an error with latest item value from DynamoDB
129+
try += 1
130+
}
131+
132+
return err
133+
}
134+
```
135+
92136
### Query
93137

94138
```go

interface.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ func BoolValue(v bool) *types.AttributeValueMemberBOOL {
2424
type WriteAPI interface {
2525
// Create or update given item in DynamoDB. Must implemenmt DynamoRecord interface.
2626
// DynamoRecord.GetKeys will be called to get values for parition and sort keys.
27-
PutItem(ctx context.Context, pk Attribute, sk Attribute, item interface{}) error
28-
DeleteItem(ctx context.Context, pk string, sk string) error
27+
PutItem(ctx context.Context, pk, sk Attribute, item interface{}, opt ...PutOption) error
28+
DeleteItem(ctx context.Context, pk, sk string) error
2929
BatchDeleteItems(ctx context.Context, input []AttributeRecord) []AttributeRecord
3030
}
3131

@@ -35,7 +35,7 @@ type TransactionAPI interface {
3535
}
3636

3737
type ReadAPI interface {
38-
GetItem(ctx context.Context, pk Attribute, sk Attribute, out interface{}) (error, bool)
38+
GetItem(ctx context.Context, pk, sk Attribute, out interface{}) (error, bool)
3939
BatchGetItems(ctx context.Context, input []AttributeRecord, out interface{}) error
4040
}
4141

put_item.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,37 @@ import (
1010
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
1111
)
1212

13+
type PutOption func(*dynamodb.PutItemInput) error
14+
15+
// Enables concurrency control by using an optimistic lock
16+
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.OptimisticLocking.html
17+
//
18+
// Provide key field acts as a version number (Usually called Version)
19+
// GetItem retrieves current version number and you can update the item if the version number in DynamoDB hasn't changed
20+
// Each update increments the version number and if the update fails fetch the record again to get latest version number and try again
21+
func WithOptimisticLock(key string, currentVersion uint) PutOption {
22+
return func(input *dynamodb.PutItemInput) error {
23+
condition := "#version = :oldVersion"
24+
input.ConditionExpression = &condition
25+
if input.ExpressionAttributeNames == nil {
26+
input.ExpressionAttributeNames = map[string]string{}
27+
}
28+
if input.ExpressionAttributeValues == nil {
29+
input.ExpressionAttributeValues = map[string]Attribute{}
30+
}
31+
input.ExpressionAttributeNames["#version"] = key
32+
input.ExpressionAttributeValues[":oldVersion"] = NumberValue(int64(currentVersion))
33+
input.Item[key] = NumberValue(int64(currentVersion + 1))
34+
return nil
35+
}
36+
}
37+
1338
/**
1439
* Used to put and update a db record from dynamodb given a partition key and sort key
1540
* @param item the item put into the database
1641
* @return true if the record was put, false otherwise
1742
*/
18-
func (t *Client) PutItem(ctx context.Context, pk Attribute, sk Attribute, item interface{}) error {
43+
func (t *Client) PutItem(ctx context.Context, pk, sk Attribute, item interface{}, opts ...PutOption) error {
1944
av, err := attributevalue.MarshalMap(item)
2045
if err != nil {
2146
log.Println("Failed to Marshal item" + err.Error())
@@ -26,10 +51,18 @@ func (t *Client) PutItem(ctx context.Context, pk Attribute, sk Attribute, item i
2651
av[k] = v
2752
}
2853

29-
_, err = t.client.PutItem(ctx, &dynamodb.PutItemInput{
54+
input := &dynamodb.PutItemInput{
3055
TableName: &t.TableName,
3156
Item: av,
32-
})
57+
}
58+
// Apply option functions
59+
if len(opts) > 0 {
60+
for _, opt := range opts {
61+
opt(input)
62+
}
63+
}
64+
65+
_, err = t.client.PutItem(ctx, input)
3366
if err != nil {
3467
log.Println("Failed to Put item" + err.Error())
3568
return err

tests/putitem_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"strings"
8+
"sync"
9+
"testing"
10+
11+
"github.com/oolio-group/dynago"
12+
)
13+
14+
type Record struct {
15+
ID string
16+
Pk string
17+
Sk string
18+
}
19+
20+
func TestPutItem(t *testing.T) {
21+
table := prepareTable(t, dynamoEndpoint, "put_test")
22+
testCases := []struct {
23+
title string
24+
item Record
25+
expected Record
26+
expectedErr error
27+
}{
28+
{
29+
title: "success 1",
30+
item: Record{
31+
ID: "1",
32+
Pk: "account_1",
33+
Sk: "account_1",
34+
},
35+
expected: Record{
36+
ID: "1",
37+
Pk: "account_1",
38+
Sk: "account_1",
39+
},
40+
},
41+
{
42+
title: "success 2",
43+
item: Record{
44+
ID: "2",
45+
Pk: "account_2",
46+
Sk: "account_2",
47+
},
48+
expected: Record{
49+
ID: "2",
50+
Pk: "account_2",
51+
Sk: "account_2",
52+
},
53+
},
54+
{
55+
title: "pk is required",
56+
item: Record{
57+
ID: "3",
58+
Sk: "account_3",
59+
},
60+
expectedErr: fmt.Errorf("The AttributeValue for a key attribute cannot contain an empty string value. Key: pk"),
61+
},
62+
{
63+
title: "sk is required",
64+
item: Record{
65+
ID: "4",
66+
Pk: "account_4",
67+
},
68+
expectedErr: fmt.Errorf("The AttributeValue for a key attribute cannot contain an empty string value. Key: sk"),
69+
},
70+
}
71+
for _, tc := range testCases {
72+
t.Run(tc.title, func(t *testing.T) {
73+
t.Helper()
74+
ctx := context.TODO()
75+
76+
pk := dynago.StringValue(tc.item.Pk)
77+
sk := dynago.StringValue(tc.item.Sk)
78+
err := table.PutItem(ctx, pk, sk, &tc.item)
79+
if err != nil {
80+
if tc.expectedErr == nil {
81+
t.Fatalf("unexpected error %s", err)
82+
}
83+
if !strings.Contains(err.Error(), tc.expectedErr.Error()) {
84+
t.Fatalf("expected op to fail with %s; got %s", tc.expectedErr, err)
85+
}
86+
return
87+
}
88+
89+
var out Record
90+
err, found := table.GetItem(ctx, pk, sk, &out)
91+
if err != nil {
92+
t.Fatalf("unexpected error %s", err)
93+
}
94+
if !found {
95+
t.Errorf("expected to find item with pk %s and sk %s", tc.item.Pk, tc.item.Sk)
96+
}
97+
if !reflect.DeepEqual(tc.expected, out) {
98+
t.Errorf("expected query to return %v; got %v", tc.expected, out)
99+
}
100+
})
101+
}
102+
}
103+
104+
type LedgerAccount struct {
105+
ID string
106+
Balance int
107+
Version uint
108+
}
109+
110+
func TestPutItemWithOptimisticLock(t *testing.T) {
111+
table := prepareTable(t, dynamoEndpoint, "put_optimistic_test")
112+
// Create new account item in DynamoDB with default values
113+
account := LedgerAccount{ID: "123"}
114+
ctx := context.Background()
115+
pk := dynago.StringValue("123")
116+
err := table.PutItem(ctx, pk, pk, account)
117+
if err != nil {
118+
t.Fatalf("unexpected error %s", err)
119+
return
120+
}
121+
122+
// Update method will add 100 to the current account balance
123+
update := func() error {
124+
var (
125+
acc LedgerAccount
126+
)
127+
err, _ := table.GetItem(ctx, pk, pk, &acc)
128+
if err != nil {
129+
return err
130+
}
131+
t.Log(acc)
132+
acc.Balance += 100
133+
return table.PutItem(ctx, pk, pk, acc, dynago.WithOptimisticLock("Version", acc.Version))
134+
}
135+
// Invoke Update in parallel 10 times to increment account balance
136+
// WithOptimisticLock should prevent concurrency overwriting issues
137+
var wg sync.WaitGroup
138+
for range 10 {
139+
wg.Add(1)
140+
go func() {
141+
defer wg.Done()
142+
for {
143+
err := update()
144+
if err == nil {
145+
return
146+
}
147+
}
148+
}()
149+
}
150+
wg.Wait()
151+
// We expect account balance to be 1000 after 10 update
152+
// If any update method overwrote with an outdated value then total balance will be less than 1000
153+
var acc LedgerAccount
154+
err, _ = table.GetItem(ctx, pk, pk, &acc)
155+
if err != nil {
156+
t.Fatalf("unexpected error %s", err)
157+
return
158+
}
159+
if acc.Balance != 1000 {
160+
t.Errorf("expected account balance to be 1000 after 10 increments of 100; got %d", acc.Balance)
161+
}
162+
}

0 commit comments

Comments
 (0)