Skip to content

Commit bdc3702

Browse files
committed
feat(): fix ConcurrentDownloadChunkDataStream
1 parent ed97469 commit bdc3702

File tree

2 files changed

+39
-21
lines changed

2 files changed

+39
-21
lines changed

client.go

+33-18
Original file line numberDiff line numberDiff line change
@@ -771,18 +771,18 @@ func (c *Client) ConcurrentDownloadChunkData(id string, concurrentNum int) ([]by
771771

772772
// it's caller's responsibility to reserve or delete the tmp file created by this method
773773

774-
func (c *Client) ConcurrentDownloadChunkDataStream(id string, concurrentNum int) (*os.File, []byte, error) {
774+
func (c *Client) ConcurrentDownloadChunkDataStream(id string, concurrentNum int) (dataFile *os.File, err error) {
775775
offsetResponse, err := c.getTransactionOffset(id)
776776
if err != nil {
777-
return nil, nil, err
777+
return nil, err
778778
}
779779
size, err := strconv.ParseInt(offsetResponse.Size, 10, 64)
780780
if err != nil {
781-
return nil, nil, err
781+
return nil, err
782782
}
783783
endOffset, err := strconv.ParseInt(offsetResponse.Offset, 10, 64)
784784
if err != nil {
785-
return nil, nil, err
785+
return nil, err
786786
}
787787
startOffset := endOffset - size + 1
788788

@@ -792,17 +792,29 @@ func (c *Client) ConcurrentDownloadChunkDataStream(id string, concurrentNum int)
792792
i += types.MAX_CHUNK_SIZE
793793
}
794794

795-
if len(offsetArr) <= 3 { // not need concurrent get chunks
796-
data, err := c.DownloadChunkData(id)
797-
return nil, data, err
798-
}
799-
800795
log.Debug("need download chunks length", "length", len(offsetArr))
801796

802-
dataFile, err := os.CreateTemp(".", "concurrent-load-chunks-")
797+
dataFile, err = os.CreateTemp(".", "concurrent-load-data-")
803798
if err != nil {
804-
return nil, nil, err
799+
return nil, err
800+
}
801+
defer func() {
802+
if err != nil {
803+
dataFile.Close()
804+
os.Remove(dataFile.Name())
805+
}
806+
}()
807+
808+
if len(offsetArr) <= 3 { // not need concurrent get chunks
809+
var data []byte
810+
data, err = c.DownloadChunkData(id)
811+
if err != nil {
812+
return nil, err
813+
}
814+
_, err = dataFile.Write(data)
815+
return dataFile, err
805816
}
817+
806818
type Offset struct {
807819
fileOffset int64
808820
chunkOffset int64
@@ -845,20 +857,21 @@ func (c *Client) ConcurrentDownloadChunkDataStream(id string, concurrentNum int)
845857

846858
for i, offset := range offsetArr[:len(offsetArr)-2] {
847859
wg.Add(1)
848-
if err := p.Invoke(Offset{fileOffset: offset, chunkOffset: offset + startOffset}); err != nil {
860+
if err = p.Invoke(Offset{fileOffset: offset, chunkOffset: offset + startOffset}); err != nil {
849861
log.Error("p.Invoke(i)", "err", err, "i", i)
850-
return nil, nil, err
862+
return nil, err
851863
}
852864
}
853865
wg.Wait()
854866
_, err = dataFile.Seek(0, 2)
855867
if err != nil {
856-
return nil, nil, err
868+
return nil, err
857869
}
858870
// add latest 2 chunks
859871
start := offsetArr[len(offsetArr)-3] + startOffset + types.MAX_CHUNK_SIZE
860872
for i := 0; int64(i)+start < endOffset; {
861-
chunkData, err := c.getChunkData(int64(i) + start)
873+
var chunkData []byte
874+
chunkData, err = c.getChunkData(int64(i) + start)
862875
if err != nil {
863876
count := 0
864877
for count < 2 {
@@ -874,17 +887,19 @@ func (c *Client) ConcurrentDownloadChunkDataStream(id string, concurrentNum int)
874887
}
875888
}
876889
if err != nil {
877-
return nil, nil, errors.New("concurrent get latest two chunks failed")
890+
err = errors.New(fmt.Sprintf("concurrent get latest two chunks failed,err:%v", err))
891+
return nil, err
878892
}
879893
n := 0
880894
n, err = dataFile.Write(chunkData)
881895
if err != nil || n < len(chunkData) {
882-
return nil, nil, fmt.Errorf("write dataFile error writeSize:%d, expectSize:%d", n, len(chunkData))
896+
err = fmt.Errorf("write dataFile error writeSize:%d, expectSize:%d", n, len(chunkData))
897+
return nil, err
883898
}
884899
i += len(chunkData)
885900
}
886901

887-
return dataFile, nil, nil
902+
return dataFile, nil
888903
}
889904

890905
func (c *Client) GetUnconfirmedTx(arId string) (*types.Transaction, error) {

example/local_data_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,17 @@ func TestConcurrentDownloadStream(t *testing.T) {
5050
arCli := goar.NewClient("https://arweave.net")
5151

5252
arId := "cqCdSEKu-A272DuwFpKPBdyEsxXHT92gxoorS3Y-sbM"
53-
dataFile, data, err := arCli.ConcurrentDownloadChunkDataStream(arId, 0)
53+
// arId := "3SZ_nXUzHIpOMJ5yWQhesODh13ZI4-ObgdkttYfxyy4"
54+
dataFile, err := arCli.ConcurrentDownloadChunkDataStream(arId, 0)
5455
assert.NoError(t, err)
55-
assert.Equal(t, 0, len(data))
56+
info, err := dataFile.Stat()
57+
assert.NoError(t, err)
58+
t.Log(info.Size(), info.Name())
5659
dataFile.Close()
60+
os.Remove(dataFile.Name())
5761
}
5862

5963
func TestSendDataStream(t *testing.T) {
60-
6164
arNode := "https://arweave.net"
6265
w, err := goar.NewWalletFromPath("./testKey.json", arNode) // your wallet private key
6366
assert.NoError(t, err)

0 commit comments

Comments
 (0)