Skip to content

Commit c1bd5b3

Browse files
committedSep 23, 2021
Merge remote-tracking branch 'origin/feature/uploader' into broadcast-txs
# Conflicts: # client.go # client_test.go
2 parents 1639dd1 + e717116 commit c1bd5b3

9 files changed

+228
-198
lines changed
 

‎README.md

+5-9
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,10 @@ uploader, err := goar.CreateUploader(w.Client, tx, nil)
240240
if err != nil {
241241
return
242242
}
243-
for !uploader.IsComplete() {
244-
err = uploader.UploadChunk()
245-
if err != nil {
246-
return
247-
}
243+
244+
err = uploader.Once()
245+
if err != nil {
246+
return
248247
}
249248
```
250249

@@ -280,10 +279,7 @@ txId := "myTxId"
280279
// get uploader by txId and post big data by chunks
281280
uploader, err := goar.CreateUploader(wallet.Client, txId, bigData)
282281
assert.NoError(t, err)
283-
for !uploader.IsComplete() {
284-
err := uploader.UploadChunk()
285-
assert.NoError(t, err)
286-
}
282+
assert.NoError(t, uploader.Once())
287283
```
288284

289285
##### NOTE: About all chunk transfer full example can be viewed in path `./example/chunks_tx_test.go`

‎client.go

+15-112
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,28 @@ func (c *Client) GetInfo() (info *types.NetworkInfo, err error) {
5454
return
5555
}
5656

57-
func (c *Client) Peers() ([]string, error) {
57+
func (c *Client) GetPeers() ([]string, error) {
5858
body, _, err := c.httpGet("peers")
5959
if err != nil {
6060
return nil, ErrBadGateway
6161
}
6262

6363
peers := make([]string, 0)
6464
err = json.Unmarshal(body, &peers)
65-
return peers, err
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
// filter local
70+
fpeers := make([]string, 0)
71+
for _, p := range peers {
72+
if strings.Contains(p, "127.0.0.") {
73+
continue
74+
}
75+
fpeers = append(fpeers, p)
76+
}
77+
78+
return peers[:], nil
6679
}
6780

6881
// GetTransactionByID status: Pending/Invalid hash/overspend
@@ -433,113 +446,3 @@ func (c *Client) DownloadChunkData(id string) ([]byte, error) {
433446
}
434447
return data, nil
435448
}
436-
437-
func (c *Client) GetTxDataFromPeers(txId string) ([]byte, error) {
438-
peers, err := c.Peers()
439-
if err != nil {
440-
return nil, err
441-
}
442-
for _, peer := range peers {
443-
if strings.Contains(peer, "127.0") {
444-
continue
445-
}
446-
arNode := NewClient("http://" + peer)
447-
data, err := arNode.DownloadChunkData(txId)
448-
if err != nil {
449-
fmt.Printf("get tx data error:%v, peer: %s\n", err, peer)
450-
continue
451-
}
452-
fmt.Printf("success get tx data; peer: %s\n", peer)
453-
return data, nil
454-
}
455-
return nil, errors.New("get tx data from peers failed")
456-
}
457-
458-
func (c *Client) UploadTxDataToPeers(txId string, data []byte) error {
459-
peers, err := c.Peers()
460-
if err != nil {
461-
return err
462-
}
463-
464-
count := 0
465-
for _, peer := range peers {
466-
if strings.Contains(peer, "127.0") {
467-
continue
468-
}
469-
fmt.Printf("upload peer: %s, count: %d\n", peer, count)
470-
arNode := NewClient("http://" + peer)
471-
uploader, err := CreateUploader(arNode, txId, data)
472-
if err != nil {
473-
continue
474-
}
475-
Loop:
476-
for !uploader.IsComplete() {
477-
if err := uploader.UploadChunk(); err != nil {
478-
break Loop
479-
}
480-
if uploader.LastResponseStatus != 200 {
481-
break Loop
482-
}
483-
}
484-
if uploader.IsComplete() { // upload success
485-
count++
486-
}
487-
if count > 20 {
488-
return nil
489-
}
490-
}
491-
return fmt.Errorf("upload tx data to peers failed, txId: %s", txId)
492-
}
493-
494-
// push to bundler gateway
495-
496-
// SendItemToBundler send bundle bundleItem to bundler gateway
497-
func (c *Client) SendItemToBundler(itemBinary []byte) (*types.BundlerResp, error) {
498-
// post to bundler
499-
resp, err := http.DefaultClient.Post(types.BUNDLER_HOST+"/tx", "application/octet-stream", bytes.NewReader(itemBinary))
500-
if err != nil {
501-
return nil, err
502-
}
503-
if resp.StatusCode != http.StatusOK {
504-
return nil, fmt.Errorf("send to bundler request failed; http code: %d", resp.StatusCode)
505-
}
506-
507-
defer resp.Body.Close()
508-
// json unmarshal
509-
body, err := ioutil.ReadAll(resp.Body)
510-
if err != nil {
511-
return nil, fmt.Errorf("ioutil.ReadAll(resp.Body) error: %v", err)
512-
}
513-
br := &types.BundlerResp{}
514-
if err := json.Unmarshal(body, br); err != nil {
515-
return nil, fmt.Errorf("json.Unmarshal(body,br) failed; err: %v", err)
516-
}
517-
return br, nil
518-
}
519-
520-
func (c *Client) BatchSendItemToBundler(bundleItems []types.BundleItem) ([]*types.BundlerResp, error) {
521-
respList := make([]*types.BundlerResp, 0, len(bundleItems))
522-
for _, item := range bundleItems {
523-
itemBinary := item.ItemBinary
524-
if len(itemBinary) == 0 {
525-
if err := utils.GenerateItemBinary(&item); err != nil {
526-
return nil, err
527-
}
528-
itemBinary = item.ItemBinary
529-
}
530-
resp, err := c.SendItemToBundler(itemBinary)
531-
if err != nil {
532-
return nil, err
533-
}
534-
respList = append(respList, resp)
535-
}
536-
return respList, nil
537-
}
538-
539-
func (c *Client) GetBundle(arId string) (*types.Bundle, error) {
540-
data, err := c.DownloadChunkData(arId)
541-
if err != nil {
542-
return nil, err
543-
}
544-
return utils.DecodeBundle(data)
545-
}

‎client_broadcast.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package goar
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/everFinance/sandy_log/log"
8+
)
9+
10+
func (c *Client) GetTxDataFromPeers(txId string) ([]byte, error) {
11+
peers, err := c.GetPeers()
12+
if err != nil {
13+
return nil, err
14+
}
15+
16+
for _, peer := range peers {
17+
pNode := NewClient("http://" + peer)
18+
data, err := pNode.GetTransactionData(txId)
19+
if err != nil {
20+
log.Error("get tx data failed", "error", err, "peer", peer)
21+
continue
22+
}
23+
return data, nil
24+
}
25+
26+
return nil, errors.New("get tx data from peers failed")
27+
}
28+
29+
func (c *Client) BroadcastData(txId string, data []byte, numOfNodes int64) error {
30+
peers, err := c.GetPeers()
31+
if err != nil {
32+
return err
33+
}
34+
35+
count := int64(0)
36+
for _, peer := range peers {
37+
38+
fmt.Printf("upload peer: %s, count: %d\n", peer, count)
39+
arNode := NewClient("http://" + peer)
40+
uploader, err := CreateUploader(arNode, txId, data)
41+
if err != nil {
42+
continue
43+
}
44+
45+
if err = uploader.Once(); err != nil {
46+
continue
47+
}
48+
49+
count++
50+
if count >= numOfNodes {
51+
return nil
52+
}
53+
}
54+
55+
return fmt.Errorf("upload tx data to peers failed, txId: %s", txId)
56+
}

‎client_bundle.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package goar
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io/ioutil"
8+
"net/http"
9+
10+
"github.com/everFinance/goar/types"
11+
"github.com/everFinance/goar/utils"
12+
)
13+
14+
func (c *Client) GetBundle(arId string) (*types.Bundle, error) {
15+
data, err := c.DownloadChunkData(arId)
16+
if err != nil {
17+
return nil, err
18+
}
19+
return utils.DecodeBundle(data)
20+
}
21+
22+
// SendItemToBundler send bundle bundleItem to bundler gateway
23+
func (c *Client) SendItemToBundler(itemBinary []byte) (*types.BundlerResp, error) {
24+
// post to bundler
25+
resp, err := http.DefaultClient.Post(types.BUNDLER_HOST+"/tx", "application/octet-stream", bytes.NewReader(itemBinary))
26+
if err != nil {
27+
return nil, err
28+
}
29+
if resp.StatusCode != http.StatusOK {
30+
return nil, fmt.Errorf("send to bundler request failed; http code: %d", resp.StatusCode)
31+
}
32+
33+
defer resp.Body.Close()
34+
// json unmarshal
35+
body, err := ioutil.ReadAll(resp.Body)
36+
if err != nil {
37+
return nil, fmt.Errorf("ioutil.ReadAll(resp.Body) error: %v", err)
38+
}
39+
br := &types.BundlerResp{}
40+
if err := json.Unmarshal(body, br); err != nil {
41+
return nil, fmt.Errorf("json.Unmarshal(body,br) failed; err: %v", err)
42+
}
43+
return br, nil
44+
}
45+
46+
func (c *Client) BatchSendItemToBundler(bundleItems []types.BundleItem) ([]*types.BundlerResp, error) {
47+
respList := make([]*types.BundlerResp, 0, len(bundleItems))
48+
for _, item := range bundleItems {
49+
itemBinary := item.ItemBinary
50+
if len(itemBinary) == 0 {
51+
if err := utils.GenerateItemBinary(&item); err != nil {
52+
return nil, err
53+
}
54+
itemBinary = item.ItemBinary
55+
}
56+
resp, err := c.SendItemToBundler(itemBinary)
57+
if err != nil {
58+
return nil, err
59+
}
60+
respList = append(respList, resp)
61+
}
62+
return respList, nil
63+
}

‎client_test.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -196,26 +196,33 @@ func TestClient_GetTransactionDataByGateway(t *testing.T) {
196196
func TestClient_GetPeers(t *testing.T) {
197197
arNode := "https://arweave.net"
198198
cli := NewClient(arNode)
199-
peers, err := cli.Peers()
199+
peers, err := cli.GetPeers()
200200
assert.NoError(t, err)
201201
t.Log(len(peers))
202202
}
203203

204-
func Test_UploadTxDataToPeers(t *testing.T) {
205-
data := make([]byte, 1103732)
206-
data[0] = byte('z')
207-
data[1] = byte('y')
208-
data[2] = byte('j')
209-
for i := 3; i < len(data); i++ {
210-
data[i] = byte('a')
211-
}
212-
213-
cli := NewClient("https://arweave.net")
214-
txId := "D3GOny9cItUEc8qAl1oLUtnoLOB3OfSB-wKbw8TUIRc"
215-
err = cli.UploadTxDataToPeers(txId, data)
216-
assert.NoError(t, err)
217-
}
204+
// func Test_BroadcastData(t *testing.T) {
205+
// data := make([]byte, 1103732)
206+
// data[0] = byte('z')
207+
// data[1] = byte('y')
208+
// data[2] = byte('j')
209+
// for i := 3; i < len(data); i++ {
210+
// data[i] = byte('a')
211+
// }
212+
213+
// cli := NewClient("https://arweave.net")
214+
// txId := "D3GOny9cItUEc8qAl1oLUtnoLOB3OfSB-wKbw8TUIRc"
215+
// err = cli.BroadcastData(txId, data, 1)
216+
// assert.NoError(t, err)
217+
// }
218218

219+
// func Test_GetTxDataFromPeers(t *testing.T) {
220+
// cli := NewClient("https://arweave.net")
221+
// txId := "D3GOny9cItUEc8qAl1oLUtnoLOB3OfSB-wKbw8TUIRc"
222+
// data, err := cli.GetTxDataFromPeers(txId)
223+
// assert.NoError(t, err)
224+
// assert.Equal(t, 1471643, len(data))
225+
// }
219226
func Test_GetTxDataFromPeers(t *testing.T) {
220227
cli := NewClient("https://arweave.net")
221228
txId := "J5FY1Ovd6JJ49WFHfCf-1wDM1TbaPSdKnGIB_8ePErE"

‎example/chunks_tx_test.go

+3-12
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,7 @@ func Test_PostBigDataByChunks(t *testing.T) {
8383
// uploader Transaction
8484
uploader, err := goar.CreateUploader(wallet.Client, tx, nil)
8585
assert.NoError(t, err)
86-
for !uploader.IsComplete() {
87-
err := uploader.UploadChunk()
88-
assert.NoError(t, err)
89-
}
86+
assert.NoError(t, uploader.Once())
9087
}
9188

9289
// test retry upload(断点重传) post big size data by tx id
@@ -127,10 +124,7 @@ func Test_RetryUploadDataByTxId(t *testing.T) {
127124
// get uploader by txId and post big data by chunks
128125
uploader, err := goar.CreateUploader(wallet.Client, tx.ID, bigData)
129126
assert.NoError(t, err)
130-
for !uploader.IsComplete() {
131-
err := uploader.UploadChunk()
132-
assert.NoError(t, err)
133-
}
127+
assert.NoError(t, uploader.Once())
134128
}
135129

136130
// test continue upload(断点续传) big size data by last time uploader
@@ -171,10 +165,7 @@ func Test_ContinueUploadDataByLastUploader(t *testing.T) {
171165
// new uploader object by last time uploader
172166
newUploader, err := goar.CreateUploader(wallet.Client, lastUploader.FormatSerializedUploader(), bigData)
173167
assert.NoError(t, err)
174-
for !newUploader.IsComplete() {
175-
err := newUploader.UploadChunk()
176-
assert.NoError(t, err)
177-
}
168+
assert.NoError(t, newUploader.Once())
178169

179170
// end remove jsonUploaderFile.json file
180171
_ = os.Remove("./jsonUploaderFile.json")

‎uploader.go

+14
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ func CreateUploader(api *Client, upload interface{}, data []byte) (*TransactionU
108108
return uploader, err
109109
}
110110

111+
func (tt *TransactionUploader) Once() (err error) {
112+
for !tt.IsComplete() {
113+
if err = tt.UploadChunk(); err != nil {
114+
return
115+
}
116+
117+
if tt.LastResponseStatus != 200 {
118+
return
119+
}
120+
}
121+
122+
return
123+
}
124+
111125
func (tt *TransactionUploader) IsComplete() bool {
112126
tChunks := tt.Transaction.Chunks
113127
if tChunks == nil {

0 commit comments

Comments
 (0)