Skip to content

Commit 0a73152

Browse files
Merge pull request #42 from everFinance/feature/concurency-get-chunks
Feature/concurency get chunks
2 parents 1b62b0f + fdac2b6 commit 0a73152

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ fmt.Println(tx.ID, err)
9696
- [x] GetUnconfirmedTx
9797
- [x] GetPendingTxIds
9898
- [x] GetBlockHashList
99+
- [x] ConcurrentDownloadChunkData
99100

100101
Initialize the instance:
101102

client.go

+105
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"errors"
77
"fmt"
88
"github.com/inconshreveable/log15"
9+
"github.com/panjf2000/ants/v2"
910
"io/ioutil"
1011
"math/big"
1112
"net/http"
1213
"net/url"
1314
"path"
1415
"strconv"
1516
"strings"
17+
"sync"
1618
"time"
1719

1820
"github.com/everFinance/goar/types"
@@ -503,6 +505,109 @@ func (c *Client) DownloadChunkData(id string) ([]byte, error) {
503505
return data, nil
504506
}
505507

508+
func (c *Client) ConcurrentDownloadChunkData(id string, concurrentNum int) ([]byte, error) {
509+
offsetResponse, err := c.getTransactionOffset(id)
510+
if err != nil {
511+
return nil, err
512+
}
513+
size, err := strconv.ParseInt(offsetResponse.Size, 10, 64)
514+
if err != nil {
515+
return nil, err
516+
}
517+
endOffset, err := strconv.ParseInt(offsetResponse.Offset, 10, 64)
518+
if err != nil {
519+
return nil, err
520+
}
521+
startOffset := endOffset - size + 1
522+
523+
offsetArr := make([]int64, 0, 5)
524+
for i := 0; int64(i)+startOffset < endOffset; {
525+
offsetArr = append(offsetArr, int64(i)+startOffset)
526+
i += types.MAX_CHUNK_SIZE
527+
}
528+
529+
if len(offsetArr) <= 3 { // not need concurrent get chunks
530+
return c.DownloadChunkData(id)
531+
}
532+
533+
log.Debug("need download chunks length", "length", len(offsetArr))
534+
// concurrent get chunks
535+
type OffsetSort struct {
536+
Idx int
537+
Offset int64
538+
}
539+
540+
chunkArr := make([][]byte, len(offsetArr)-2)
541+
var (
542+
lock sync.Mutex
543+
wg sync.WaitGroup
544+
)
545+
if concurrentNum <= 0 {
546+
concurrentNum = types.DEFAULT_CHUNK_CONCURRENT_NUM
547+
}
548+
p, _ := ants.NewPoolWithFunc(concurrentNum, func(i interface{}) {
549+
defer wg.Done()
550+
oss := i.(OffsetSort)
551+
chunkData, err := c.getChunkData(oss.Offset)
552+
if err != nil {
553+
count := 0
554+
for count < 50 {
555+
time.Sleep(2 * time.Second)
556+
chunkData, err = c.getChunkData(oss.Offset)
557+
if err == nil {
558+
break
559+
}
560+
log.Error("retry getChunkData failed and try again...", "err", err, "offset", oss.Offset, "retryCount", count)
561+
count++
562+
}
563+
}
564+
lock.Lock()
565+
chunkArr[oss.Idx] = chunkData
566+
lock.Unlock()
567+
})
568+
569+
defer p.Release()
570+
571+
for i, offset := range offsetArr[:len(offsetArr)-2] {
572+
wg.Add(1)
573+
if err := p.Invoke(OffsetSort{Idx: i, Offset: offset}); err != nil {
574+
log.Error("p.Invoke(i)", "err", err, "i", i)
575+
return nil, err
576+
}
577+
}
578+
wg.Wait()
579+
580+
// add latest 2 chunks
581+
start := offsetArr[len(offsetArr)-3] + types.MAX_CHUNK_SIZE
582+
for i := 0; int64(i)+start < endOffset; {
583+
chunkData, err := c.getChunkData(int64(i) + start)
584+
if err != nil {
585+
count := 0
586+
for count < 50 {
587+
time.Sleep(2 * time.Second)
588+
chunkData, err = c.getChunkData(int64(i) + start)
589+
if err == nil {
590+
break
591+
}
592+
log.Error("retry getChunkData failed and try again...", "err", err, "offset", int64(i)+start, "retryCount", count)
593+
count++
594+
}
595+
}
596+
chunkArr = append(chunkArr, chunkData)
597+
i += len(chunkData)
598+
}
599+
600+
// assemble data
601+
data := make([]byte, 0, size)
602+
for _, chunk := range chunkArr {
603+
if chunk == nil {
604+
return nil, errors.New("concurrent get chunk failed")
605+
}
606+
data = append(data, chunk...)
607+
}
608+
return data, nil
609+
}
610+
506611
func (c *Client) GetUnconfirmedTx(arId string) (*types.Transaction, error) {
507612
_path := fmt.Sprintf("unconfirmed_tx/%s", arId)
508613
body, statusCode, err := c.httpGet(_path)

client_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -305,3 +305,15 @@ func TestClient_GetBlockHashList2(t *testing.T) {
305305
// t.Log(list)
306306
// }
307307
}
308+
309+
func TestClient_ConcurrentDownloadChunkData(t *testing.T) {
310+
c := NewClient("https://arweave.net")
311+
arId := "hG478oYzU0ReX8fCsRMcsm4ywuO4eiElCqDUAsm0nDk"
312+
data, err := c.ConcurrentDownloadChunkData(arId, 0)
313+
assert.NoError(t, err)
314+
// ioutil.WriteFile("nannan.jpeg",data,0777)
315+
chunks := utils.GenerateChunks(data)
316+
dataRoot := utils.Base64Encode(chunks.DataRoot)
317+
t.Log(dataRoot)
318+
t.Log(len(data))
319+
}

0 commit comments

Comments
 (0)