Skip to content

Commit bbd199d

Browse files
authored
feat: add a support for transact items in dynago to do put or delete operations synchronously (#5)
- [x] add a support for transact items in dynago to do put or delete operations synchronously Tested : locally using integrated testing with local db
2 parents da47710 + fc1a8c6 commit bbd199d

File tree

3 files changed

+189
-0
lines changed

3 files changed

+189
-0
lines changed

interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type WriteAPI interface {
2929

3030
type TransactionAPI interface {
3131
TransactPutItems(ctx context.Context, items []*TransactPutItemsInput) error
32+
TransactItems(ctx context.Context, input []types.TransactWriteItem) error
3233
}
3334

3435
type ReadAPI interface {

tests/transact_items_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"strings"
7+
"testing"
8+
9+
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
10+
"github.com/oolio-group/dynago"
11+
)
12+
13+
type Terminal struct {
14+
Id string
15+
Pk string
16+
Sk string
17+
}
18+
19+
func TestTransactItems(t *testing.T) {
20+
endoint, purge := startLocalDatabase(t)
21+
defer purge()
22+
23+
table := prepareTable(t, endoint, "transcact_item_test")
24+
25+
testCases := []struct {
26+
title string
27+
condition string
28+
keys map[string]types.AttributeValue
29+
opts []dynago.QueryOptions
30+
//items to be added
31+
newItems []Terminal
32+
operations []types.TransactWriteItem
33+
//items expected to exist in table after transaction operation
34+
expected []Terminal
35+
expectedErr error
36+
}{{
37+
title: "assign terminal - only add a terminal",
38+
condition: "pk = :pk",
39+
keys: map[string]types.AttributeValue{
40+
":pk": &types.AttributeValueMemberS{Value: "terminal1"},
41+
},
42+
newItems: []Terminal{},
43+
operations: []types.TransactWriteItem{
44+
table.WithPutItem("terminal1", "merchant1", Terminal{
45+
Id: "1",
46+
Pk: "terminal1",
47+
Sk: "merchant1",
48+
}),
49+
},
50+
expected: []Terminal{
51+
{
52+
Id: "1",
53+
Pk: "terminal1",
54+
Sk: "merchant1",
55+
},
56+
},
57+
},
58+
{
59+
title: "assign terminal - delete existing and update with new",
60+
condition: "pk = :pk",
61+
keys: map[string]types.AttributeValue{
62+
":pk": &types.AttributeValueMemberS{Value: "terminal1"},
63+
},
64+
newItems: []Terminal{{
65+
Id: "1",
66+
Pk: "terminal1",
67+
Sk: "merchant2",
68+
}},
69+
operations: []types.TransactWriteItem{
70+
table.WithDeleteItem("terminal1", "merchant1"),
71+
table.WithPutItem("terminal1", "merchant2", Terminal{
72+
Id: "1",
73+
Pk: "terminal1",
74+
Sk: "merchant2",
75+
}),
76+
},
77+
expected: []Terminal{
78+
{
79+
Id: "1",
80+
Pk: "terminal1",
81+
Sk: "merchant2",
82+
},
83+
},
84+
},
85+
}
86+
for _, tc := range testCases {
87+
t.Run(tc.title, func(t *testing.T) {
88+
t.Helper()
89+
ctx := context.TODO()
90+
// Create Item
91+
if len(tc.newItems) > 0 {
92+
items := make([]*dynago.TransactPutItemsInput, 0, len(tc.newItems))
93+
for _, item := range tc.newItems {
94+
items = append(items, &dynago.TransactPutItemsInput{
95+
dynago.StringValue(item.Pk), dynago.StringValue(item.Sk), item,
96+
})
97+
}
98+
err := table.TransactPutItems(ctx, items)
99+
if err != nil {
100+
t.Fatalf("transaction put items failed; got %s", err)
101+
}
102+
}
103+
//perform operations
104+
if len(tc.operations) > 0 {
105+
err := table.TransactItems(ctx, tc.operations)
106+
if err != nil {
107+
t.Fatalf("error occurred %s", err)
108+
}
109+
110+
}
111+
112+
var out []Terminal
113+
_, err := table.Query(ctx, tc.condition, tc.keys, &out)
114+
if tc.expectedErr != nil {
115+
if err == nil {
116+
t.Fatalf("expected query to fail with %s", tc.expectedErr)
117+
}
118+
if !strings.Contains(err.Error(), tc.expectedErr.Error()) {
119+
t.Fatalf("expected query to fail with %s; got %s", tc.expectedErr, err)
120+
}
121+
return
122+
}
123+
if err != nil {
124+
t.Fatalf("expected query to succeed; got %s", err)
125+
}
126+
if !reflect.DeepEqual(tc.expected, out) {
127+
t.Errorf("expected query to return %v; got %v", tc.expected, out)
128+
}
129+
130+
})
131+
132+
}
133+
134+
}

transaction_items.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package dynago
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
8+
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
9+
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
10+
)
11+
12+
func (t *Client) WithDeleteItem(pk string, sk string) types.TransactWriteItem {
13+
return types.TransactWriteItem{
14+
Delete: &types.Delete{
15+
TableName: &t.TableName,
16+
Key: map[string]types.AttributeValue{
17+
"pk": &types.AttributeValueMemberS{Value: pk},
18+
"sk": &types.AttributeValueMemberS{Value: sk},
19+
},
20+
},
21+
}
22+
23+
}
24+
25+
func (t *Client) WithPutItem(pk string, sk string, item interface{}) types.TransactWriteItem {
26+
av, err := attributevalue.MarshalMap(item)
27+
if err != nil {
28+
log.Println("Failed to Marshal item" + err.Error())
29+
return types.TransactWriteItem{}
30+
}
31+
keys := map[string]types.AttributeValue{
32+
"pk": &types.AttributeValueMemberS{Value: pk},
33+
"sk": &types.AttributeValueMemberS{Value: sk},
34+
}
35+
for k, v := range keys {
36+
av[k] = v
37+
}
38+
return types.TransactWriteItem{
39+
Put: &types.Put{
40+
TableName: &t.TableName,
41+
Item: av,
42+
},
43+
}
44+
45+
}
46+
47+
// TransactItems is a synchronous for writing or deletion operation performed in dynamodb grouped together
48+
49+
func (t *Client) TransactItems(ctx context.Context, input []types.TransactWriteItem) error {
50+
_, err := t.client.TransactWriteItems(ctx, &dynamodb.TransactWriteItemsInput{
51+
TransactItems: input,
52+
})
53+
return err
54+
}

0 commit comments

Comments
 (0)