Skip to content

Commit fa798b7

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents 006e632 + 1e14276 commit fa798b7

File tree

6 files changed

+129
-13
lines changed

6 files changed

+129
-13
lines changed

example/bundle_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package example
22

33
import (
4+
"context"
45
"github.com/everFinance/everpay-go/sdk"
56
"github.com/everFinance/goar"
67
"github.com/everFinance/goar/types"
@@ -70,7 +71,8 @@ func TestBundleToArweave(t *testing.T) {
7071

7172
// send to arweave
7273
wal, err := goar.NewWalletFromPath("jwkKey.json", "https://arweave.net")
73-
tx, err := wal.SendBundleTx(bundle.BundleBinary, []types.Tag{
74+
assert.NoError(t, err)
75+
tx, err := wal.SendBundleTx(context.TODO(), 0, bundle.BundleBinary, []types.Tag{
7476
{Name: "App", Value: "goar"},
7577
})
7678
assert.NoError(t, err)

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ require (
2929
github.com/mattn/go-isatty v0.0.14 // indirect
3030
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3131
github.com/modern-go/reflect2 v1.0.1 // indirect
32+
github.com/panjf2000/ants/v2 v2.6.0 // indirect
3233
github.com/pmezard/go-difflib v1.0.0 // indirect
3334
golang.org/x/crypto v0.3.0 // indirect
3435
golang.org/x/sys v0.2.0 // indirect

types/const.go

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ const (
1111
NOTE_SIZE = 32
1212
HASH_SIZE = 32
1313

14+
// concurrent submit chunks min size
15+
DEFAULT_CHUNK_CONCURRENT_NUM = 50 // default concurrent number
16+
1417
// number of bits in a big.Word
1518
WordBits = 32 << (uint64(^big.Word(0)) >> 63)
1619
// number of bytes in a big.Word

uploader.go

+77
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package goar
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
7+
"github.com/panjf2000/ants/v2"
68
"math"
79
"math/rand"
810
"strconv"
11+
"sync"
912
"time"
1013

1114
"github.com/everFinance/goar/types"
@@ -148,6 +151,80 @@ func (tt *TransactionUploader) PctComplete() float64 {
148151
return math.Trunc(fval * 100)
149152
}
150153

154+
func (tt *TransactionUploader) ConcurrentOnce(ctx context.Context, concurrentNum int) error {
155+
// post tx info
156+
if err := tt.postTransaction(); err != nil {
157+
return err
158+
}
159+
160+
if tt.IsComplete() {
161+
return nil
162+
}
163+
164+
var wg sync.WaitGroup
165+
if concurrentNum <= 0 {
166+
concurrentNum = types.DEFAULT_CHUNK_CONCURRENT_NUM
167+
}
168+
p, _ := ants.NewPoolWithFunc(concurrentNum, func(i interface{}) {
169+
defer wg.Done()
170+
// process submit chunk
171+
idx := i.(int)
172+
173+
select {
174+
case <-ctx.Done():
175+
log.Warn("ctx.done", "chunkIdx", idx)
176+
return
177+
default:
178+
}
179+
chunk, err := utils.GetChunk(*tt.Transaction, idx, tt.Data)
180+
if err != nil {
181+
log.Error("GetChunk error", "err", err, "idx", idx)
182+
return
183+
}
184+
body, statusCode, err := tt.Client.SubmitChunks(chunk) // always body is errMsg
185+
if statusCode == 200 {
186+
return
187+
}
188+
189+
log.Error("concurrent submitChunk failed", "chunkIdx", idx, "statusCode", statusCode, "gatewayErr", body, "httpErr", err)
190+
// try again
191+
retryCount := 0
192+
for {
193+
select {
194+
case <-ctx.Done():
195+
log.Warn("ctx.done", "chunkIdx", idx)
196+
return
197+
default:
198+
}
199+
200+
retryCount++
201+
if statusCode == 429 {
202+
time.Sleep(1 * time.Second)
203+
} else {
204+
time.Sleep(200 * time.Millisecond)
205+
}
206+
207+
body, statusCode, err = tt.Client.SubmitChunks(chunk)
208+
if statusCode == 200 {
209+
return
210+
}
211+
log.Warn("retry submitChunk failed", "retryCount", retryCount, "chunkIdx", idx, "statusCode", statusCode, "gatewayErr", body, "httpErr", err)
212+
}
213+
})
214+
215+
defer p.Release()
216+
for i := 0; i < len(tt.Transaction.Chunks.Chunks); i++ {
217+
wg.Add(1)
218+
if err := p.Invoke(i); err != nil {
219+
log.Error("p.Invoke(i)", "err", err, "i", i)
220+
return err
221+
}
222+
}
223+
224+
wg.Wait()
225+
return nil
226+
}
227+
151228
/**
152229
* Uploads the next part of the Transaction.
153230
* On the first call this posts the Transaction

wallet.go

+40-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package goar
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"io/ioutil"
@@ -101,26 +102,57 @@ func (w *Wallet) SendDataSpeedUp(data []byte, tags []types.Tag, speedFactor int6
101102
return w.SendTransaction(tx)
102103
}
103104

104-
// SendTransaction: if send success, should return pending
105-
func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, error) {
106-
anchor, err := w.Client.GetTransactionAnchor()
105+
func (w *Wallet) SendDataConcurrentSpeedUp(ctx context.Context, concurrentNum int, data []byte, tags []types.Tag, speedFactor int64) (types.Transaction, error) {
106+
reward, err := w.Client.GetTransactionPrice(data, nil)
107107
if err != nil {
108108
return types.Transaction{}, err
109109
}
110-
tx.LastTx = anchor
111-
tx.Owner = w.Owner()
112-
if err = w.Signer.SignTx(tx); err != nil {
113-
return types.Transaction{}, err
110+
111+
tx := &types.Transaction{
112+
Format: 2,
113+
Target: "",
114+
Quantity: "0",
115+
Tags: utils.TagsEncode(tags),
116+
Data: utils.Base64Encode(data),
117+
DataSize: fmt.Sprintf("%d", len(data)),
118+
Reward: fmt.Sprintf("%d", reward*(100+speedFactor)/100),
114119
}
115120

116-
uploader, err := CreateUploader(w.Client, tx, nil)
121+
return w.SendTransactionConcurrent(ctx, concurrentNum, tx)
122+
}
123+
124+
// SendTransaction: if send success, should return pending
125+
func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, error) {
126+
uploader, err := w.getUploader(tx)
117127
if err != nil {
118128
return types.Transaction{}, err
119129
}
120130
err = uploader.Once()
121131
return *tx, err
122132
}
123133

134+
func (w *Wallet) SendTransactionConcurrent(ctx context.Context, concurrentNum int, tx *types.Transaction) (types.Transaction, error) {
135+
uploader, err := w.getUploader(tx)
136+
if err != nil {
137+
return types.Transaction{}, err
138+
}
139+
err = uploader.ConcurrentOnce(ctx, concurrentNum)
140+
return *tx, err
141+
}
142+
143+
func (w *Wallet) getUploader(tx *types.Transaction) (*TransactionUploader, error) {
144+
anchor, err := w.Client.GetTransactionAnchor()
145+
if err != nil {
146+
return nil, err
147+
}
148+
tx.LastTx = anchor
149+
tx.Owner = w.Owner()
150+
if err = w.Signer.SignTx(tx); err != nil {
151+
return nil, err
152+
}
153+
return CreateUploader(w.Client, tx, nil)
154+
}
155+
124156
func (w *Wallet) SendPst(contractId string, target string, qty *big.Int, customTags []types.Tag, speedFactor int64) (types.Transaction, error) {
125157
maxQty := big.NewInt(9007199254740991) // swc support max js integer
126158
if qty.Cmp(maxQty) > 0 {

wallet_bundle.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package goar
22

33
import (
4+
"context"
45
"errors"
56
"github.com/everFinance/goar/types"
67
)
78

8-
func (w *Wallet) SendBundleTxSpeedUp(bundleBinary []byte, tags []types.Tag, txSpeed int64) (types.Transaction, error) {
9+
func (w *Wallet) SendBundleTxSpeedUp(ctx context.Context, concurrentNum int, bundleBinary []byte, tags []types.Tag, txSpeed int64) (types.Transaction, error) {
910
bundleTags := []types.Tag{
1011
{Name: "Bundle-Format", Value: "binary"},
1112
{Name: "Bundle-Version", Value: "2.0.0"},
@@ -23,9 +24,9 @@ func (w *Wallet) SendBundleTxSpeedUp(bundleBinary []byte, tags []types.Tag, txSp
2324

2425
txTags := make([]types.Tag, 0)
2526
txTags = append(bundleTags, tags...)
26-
return w.SendDataSpeedUp(bundleBinary, txTags, txSpeed)
27+
return w.SendDataConcurrentSpeedUp(ctx, concurrentNum, bundleBinary, txTags, txSpeed)
2728
}
2829

29-
func (w *Wallet) SendBundleTx(bundleBinary []byte, tags []types.Tag) (types.Transaction, error) {
30-
return w.SendBundleTxSpeedUp(bundleBinary, tags, 0)
30+
func (w *Wallet) SendBundleTx(ctx context.Context, concurrentNum int, bundleBinary []byte, tags []types.Tag) (types.Transaction, error) {
31+
return w.SendBundleTxSpeedUp(ctx, concurrentNum, bundleBinary, tags, 0)
3132
}

0 commit comments

Comments
 (0)