Skip to content

Commit 03975cf

Browse files
committed
feat(): add concurrent submitData function
1 parent 96fd4e8 commit 03975cf

File tree

6 files changed

+73
-29
lines changed

6 files changed

+73
-29
lines changed

example/bundle_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package example
22

33
import (
4+
"context"
45
"github.com/everFinance/everpay-go/sdk"
56
"github.com/everFinance/goar"
67
"github.com/everFinance/goar/types"
@@ -70,7 +71,8 @@ func TestBundleToArweave(t *testing.T) {
7071

7172
// send to arweave
7273
wal, err := goar.NewWalletFromPath("jwkKey.json", "https://arweave.net")
73-
tx, err := wal.SendBundleTx(bundle.BundleBinary, []types.Tag{
74+
assert.NoError(t, err)
75+
tx, err := wal.SendBundleTx(context.TODO(), 0, bundle.BundleBinary, []types.Tag{
7476
{Name: "App", Value: "goar"},
7577
})
7678
assert.NoError(t, err)

types/const.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ const (
1212
HASH_SIZE = 32
1313

1414
// concurrent submit chunks min size
15-
CONCURRENT_MIN_DATA_SIZE = 30 * 1024 * 1024 // 30 MB
16-
DEFAULT_CHUNK_CONCURRENT_NUM = 100 // default concurrent number
15+
DEFAULT_CHUNK_CONCURRENT_NUM = 50 // default concurrent number
1716

1817
// number of bits in a big.Word
1918
WordBits = 32 << (uint64(^big.Word(0)) >> 63)

uploader.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package goar
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"github.com/panjf2000/ants/v2"
78
"math"
8-
"math/big"
99
"math/rand"
1010
"strconv"
1111
"sync"
@@ -111,13 +111,6 @@ func CreateUploader(api *Client, upload interface{}, data []byte) (*TransactionU
111111
}
112112

113113
func (tt *TransactionUploader) Once() (err error) {
114-
dataSize, ok := new(big.Int).SetString(tt.Transaction.DataSize, 10)
115-
if ok && dataSize.Cmp(big.NewInt(types.CONCURRENT_MIN_DATA_SIZE)) >= 0 {
116-
log.Debug("Using concurrent upload chunks", "dataSize", dataSize.String())
117-
return tt.ConcurrentUploadChunks()
118-
}
119-
120-
// not use concurrent submit chunks
121114
for !tt.IsComplete() {
122115
if err = tt.UploadChunk(); err != nil {
123116
return
@@ -158,7 +151,7 @@ func (tt *TransactionUploader) PctComplete() float64 {
158151
return math.Trunc(fval * 100)
159152
}
160153

161-
func (tt *TransactionUploader) ConcurrentUploadChunks() error {
154+
func (tt *TransactionUploader) ConcurrentOnce(ctx context.Context, concurrentNum int) error {
162155
// post tx info
163156
if err := tt.postTransaction(); err != nil {
164157
return err
@@ -169,10 +162,20 @@ func (tt *TransactionUploader) ConcurrentUploadChunks() error {
169162
}
170163

171164
var wg sync.WaitGroup
172-
p, _ := ants.NewPoolWithFunc(types.DEFAULT_CHUNK_CONCURRENT_NUM, func(i interface{}) {
165+
if concurrentNum <= 0 {
166+
concurrentNum = types.DEFAULT_CHUNK_CONCURRENT_NUM
167+
}
168+
p, _ := ants.NewPoolWithFunc(concurrentNum, func(i interface{}) {
173169
defer wg.Done()
174170
// process submit chunk
175171
idx := i.(int)
172+
173+
select {
174+
case <-ctx.Done():
175+
log.Warn("ctx.done", "chunkIdx", idx)
176+
return
177+
default:
178+
}
176179
chunk, err := utils.GetChunk(*tt.Transaction, idx, tt.Data)
177180
if err != nil {
178181
log.Error("GetChunk error", "err", err, "idx", idx)
@@ -187,8 +190,14 @@ func (tt *TransactionUploader) ConcurrentUploadChunks() error {
187190
// try again
188191
retryCount := 0
189192
for {
190-
retryCount++
193+
select {
194+
case <-ctx.Done():
195+
log.Warn("ctx.done", "chunkIdx", idx)
196+
return
197+
default:
198+
}
191199

200+
retryCount++
192201
if statusCode == 429 {
193202
time.Sleep(1 * time.Second)
194203
} else {

wallet.go

+40-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package goar
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"io/ioutil"
@@ -101,26 +102,57 @@ func (w *Wallet) SendDataSpeedUp(data []byte, tags []types.Tag, speedFactor int6
101102
return w.SendTransaction(tx)
102103
}
103104

104-
// SendTransaction: if send success, should return pending
105-
func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, error) {
106-
anchor, err := w.Client.GetTransactionAnchor()
105+
func (w *Wallet) SendDataConcurrentSpeedUp(ctx context.Context, concurrentNum int, data []byte, tags []types.Tag, speedFactor int64) (types.Transaction, error) {
106+
reward, err := w.Client.GetTransactionPrice(data, nil)
107107
if err != nil {
108108
return types.Transaction{}, err
109109
}
110-
tx.LastTx = anchor
111-
tx.Owner = w.Owner()
112-
if err = w.Signer.SignTx(tx); err != nil {
113-
return types.Transaction{}, err
110+
111+
tx := &types.Transaction{
112+
Format: 2,
113+
Target: "",
114+
Quantity: "0",
115+
Tags: utils.TagsEncode(tags),
116+
Data: utils.Base64Encode(data),
117+
DataSize: fmt.Sprintf("%d", len(data)),
118+
Reward: fmt.Sprintf("%d", reward*(100+speedFactor)/100),
114119
}
115120

116-
uploader, err := CreateUploader(w.Client, tx, nil)
121+
return w.SendTransactionConcurrent(ctx, concurrentNum, tx)
122+
}
123+
124+
// SendTransaction: if send success, should return pending
125+
func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, error) {
126+
uploader, err := w.getUploader(tx)
117127
if err != nil {
118128
return types.Transaction{}, err
119129
}
120130
err = uploader.Once()
121131
return *tx, err
122132
}
123133

134+
func (w *Wallet) SendTransactionConcurrent(ctx context.Context, concurrentNum int, tx *types.Transaction) (types.Transaction, error) {
135+
uploader, err := w.getUploader(tx)
136+
if err != nil {
137+
return types.Transaction{}, err
138+
}
139+
err = uploader.ConcurrentOnce(ctx, concurrentNum)
140+
return *tx, err
141+
}
142+
143+
func (w *Wallet) getUploader(tx *types.Transaction) (*TransactionUploader, error) {
144+
anchor, err := w.Client.GetTransactionAnchor()
145+
if err != nil {
146+
return nil, err
147+
}
148+
tx.LastTx = anchor
149+
tx.Owner = w.Owner()
150+
if err = w.Signer.SignTx(tx); err != nil {
151+
return nil, err
152+
}
153+
return CreateUploader(w.Client, tx, nil)
154+
}
155+
124156
func (w *Wallet) SendPst(contractId string, target string, qty *big.Int, customTags []types.Tag, speedFactor int64) (types.Transaction, error) {
125157
maxQty := big.NewInt(9007199254740991) // swc support max js integer
126158
if qty.Cmp(maxQty) > 0 {

wallet_bundle.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package goar
22

33
import (
4+
"context"
45
"errors"
56
"github.com/everFinance/goar/types"
67
)
78

8-
func (w *Wallet) SendBundleTxSpeedUp(bundleBinary []byte, tags []types.Tag, txSpeed int64) (types.Transaction, error) {
9+
func (w *Wallet) SendBundleTxSpeedUp(ctx context.Context, concurrentNum int, bundleBinary []byte, tags []types.Tag, txSpeed int64) (types.Transaction, error) {
910
bundleTags := []types.Tag{
1011
{Name: "Bundle-Format", Value: "binary"},
1112
{Name: "Bundle-Version", Value: "2.0.0"},
@@ -23,9 +24,9 @@ func (w *Wallet) SendBundleTxSpeedUp(bundleBinary []byte, tags []types.Tag, txSp
2324

2425
txTags := make([]types.Tag, 0)
2526
txTags = append(bundleTags, tags...)
26-
return w.SendDataSpeedUp(bundleBinary, txTags, txSpeed)
27+
return w.SendDataConcurrentSpeedUp(ctx, concurrentNum, bundleBinary, txTags, txSpeed)
2728
}
2829

29-
func (w *Wallet) SendBundleTx(bundleBinary []byte, tags []types.Tag) (types.Transaction, error) {
30-
return w.SendBundleTxSpeedUp(bundleBinary, tags, 0)
30+
func (w *Wallet) SendBundleTx(ctx context.Context, concurrentNum int, bundleBinary []byte, tags []types.Tag) (types.Transaction, error) {
31+
return w.SendBundleTxSpeedUp(ctx, concurrentNum, bundleBinary, tags, 0)
3132
}

wallet_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ import (
44
"encoding/base64"
55
"github.com/everFinance/goar/types"
66
"github.com/everFinance/goar/utils"
7+
"github.com/stretchr/testify/assert"
78
"io/ioutil"
89
"testing"
9-
10-
"github.com/stretchr/testify/assert"
1110
)
1211

1312
var testWallet *Wallet
@@ -285,7 +284,9 @@ func TestTransactionUploader_ConcurrentUploadChunks(t *testing.T) {
285284

286285
t.Log(len(bundle.BundleBinary))
287286
// send to arweave
288-
// tx, err := w.SendBundleTx(bundle.BundleBinary, []types.Tag{
287+
// ctx ,cancel := context.WithTimeout(context.Background(),100*time.Millisecond)
288+
// defer cancel()
289+
// tx, err := w.SendBundleTx(ctx, 0,bundle.BundleBinary, []types.Tag{
289290
// {Name: "APP", Value: "Goar"},
290291
// {Name: "Protocol-Name", Value: "BAR"},
291292
// {Name: "Action", Value: "Burn"},

0 commit comments

Comments
 (0)