Skip to content

Commit 7296a04

Browse files
Merge pull request #2 from everFinance/develop
fix(): modfy get block/tx by peers
2 parents e531cf4 + 9effeb8 commit 7296a04

File tree

4 files changed

+50
-20
lines changed

4 files changed

+50
-20
lines changed

blocks.go

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func (l *BlockIdxs) existBlock(b types.Block) bool {
4848

4949
func (l *BlockIdxs) VerifyBlock(b types.Block) error {
5050
if !l.existBlock(b) {
51+
log.Warn("block indepHash not exist blockIdxs", "blockHeight", b.Height, "blockIndepHash", b.IndepHash)
5152
return errors.New("block indepHash not exist blockIdxs")
5253
}
5354
indepHash := utils.GenerateIndepHash(b)

example/all-tx-syncer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
// sync all arweave tx
99
func main() {
1010
nullFilterParams := arsyncer.FilterParams{} // non-file params
11-
startHeight := int64(861460) // from genesis block start
11+
startHeight := int64(877360)
1212
arNode := "https://arweave.net"
1313
concurrencyNumber := 10 // runtime concurrency number, default 10
1414
s := arsyncer.New(startHeight, nullFilterParams, arNode, concurrencyNumber, 15)

jobs.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package arsyncer
22

33
func (s *Syncer) runJobs() {
44
s.scheduler.Every(5).Seconds().SingletonMode().Do(s.updateBlockHashList)
5-
5+
s.scheduler.Every(1).Minute().SingletonMode().Do(s.updatePeers)
66
s.scheduler.StartAsync()
77
}
88

@@ -19,3 +19,15 @@ func (s *Syncer) updateBlockHashList() {
1919
s.blockIdxs = idxs
2020
log.Debug("update block hash_list sucess", "startHeight", idxs.StartHeight, "endHeight", idxs.EndHeight)
2121
}
22+
23+
func (s *Syncer) updatePeers() {
24+
peers, err := s.arClient.GetPeers()
25+
if err != nil {
26+
return
27+
}
28+
if len(peers) == 0 {
29+
return
30+
}
31+
// update
32+
s.peers = peers
33+
}

syncer.go

+35-18
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Syncer struct {
2626
stableDistance int64 // stable block distance
2727
blockIdxs *BlockIdxs
2828
scheduler *gocron.Scheduler
29+
peers []string
2930
}
3031

3132
func New(startHeight int64, filterParams FilterParams, arNode string, conNum int, stableDistance int64) *Syncer {
@@ -44,6 +45,11 @@ func New(startHeight int64, filterParams FilterParams, arNode string, conNum int
4445
}
4546
fmt.Println("Init arweave block indep hash_list finished...")
4647

48+
peers, err := arCli.GetPeers()
49+
if err != nil {
50+
panic(err)
51+
}
52+
4753
return &Syncer{
4854
curHeight: startHeight,
4955
FilterParams: filterParams,
@@ -56,6 +62,7 @@ func New(startHeight int64, filterParams FilterParams, arNode string, conNum int
5662
stableDistance: stableDistance,
5763
blockIdxs: idxs,
5864
scheduler: gocron.NewScheduler(time.UTC),
65+
peers: peers,
5966
}
6067
}
6168

@@ -104,7 +111,7 @@ func (s *Syncer) pollingBlock() {
104111
if end > stableHeight {
105112
end = stableHeight
106113
}
107-
blocks := mustGetBlocks(start, end, s.arClient, s.blockIdxs, int(s.conNum))
114+
blocks := mustGetBlocks(start, end, s.arClient, s.blockIdxs, int(s.conNum), s.peers)
108115
log.Info("get blocks success", "start", start, "end", end)
109116

110117
s.curHeight = end + 1
@@ -135,7 +142,7 @@ func (s *Syncer) pollingTx() {
135142
}
136143

137144
func (s *Syncer) getTxs(b types.Block) {
138-
txs := mustGetTxs(b.Height, b.Txs, s.arClient, int(s.conNum))
145+
txs := mustGetTxs(b.Height, b.Txs, s.arClient, int(s.conNum), s.peers)
139146

140147
// subscribe txs
141148
for {
@@ -180,7 +187,7 @@ func (s *Syncer) filterTx() {
180187
}
181188
}
182189

183-
func mustGetTxs(blockHeight int64, blockTxs []string, arClient *goar.Client, conNum int) (txs []types.Transaction) {
190+
func mustGetTxs(blockHeight int64, blockTxs []string, arClient *goar.Client, conNum int, peers []string) (txs []types.Transaction) {
184191
if len(blockTxs) == 0 {
185192
return
186193
}
@@ -199,7 +206,7 @@ func mustGetTxs(blockHeight int64, blockTxs []string, arClient *goar.Client, con
199206

200207
p, _ := ants.NewPoolWithFunc(conNum, func(i interface{}) {
201208
txId := i.(string)
202-
tx, err := getTxByIdRetry(blockHeight, arClient, txId)
209+
tx, err := getTxByIdRetry(blockHeight, arClient, txId, peers)
203210
if err != nil {
204211
log.Error("get tx by id error", "txId", txId, "err", err)
205212
// notice: must return fetch failed tx
@@ -225,7 +232,7 @@ func mustGetTxs(blockHeight int64, blockTxs []string, arClient *goar.Client, con
225232
return
226233
}
227234

228-
func mustGetBlocks(start, end int64, arClient *goar.Client, blockIdxs *BlockIdxs, conNum int) (blocks []*types.Block) {
235+
func mustGetBlocks(start, end int64, arClient *goar.Client, blockIdxs *BlockIdxs, conNum int, peers []string) (blocks []*types.Block) {
229236
if start > end {
230237
return
231238
}
@@ -238,7 +245,7 @@ func mustGetBlocks(start, end int64, arClient *goar.Client, blockIdxs *BlockIdxs
238245

239246
p, _ := ants.NewPoolWithFunc(conNum, func(i interface{}) {
240247
height := i.(int64)
241-
b, err := getBlockByHeightRetry(arClient, height, blockIdxs)
248+
b, err := getBlockByHeightRetry(arClient, height, blockIdxs, peers)
242249
if err != nil {
243250
log.Error("get block by height error", "height", height, "err", err)
244251
panic(err)
@@ -264,48 +271,58 @@ func mustGetBlocks(start, end int64, arClient *goar.Client, blockIdxs *BlockIdxs
264271
return
265272
}
266273

267-
func getTxByIdRetry(blockHeight int64, arCli *goar.Client, txId string) (types.Transaction, error) {
274+
func getTxByIdRetry(blockHeight int64, arCli *goar.Client, txId string, peers []string) (types.Transaction, error) {
268275
count := 0
269276
for {
270277
// get from trust node
271278
tx, err := arCli.GetTransactionByID(txId)
272-
if err != nil {
273-
// get from non-trust nodes
274-
tx, err = arCli.GetTxFromPeers(txId)
275-
}
276-
277279
if err == nil {
278280
// verify tx, ignore genesis block txs
279281
if blockHeight != 0 {
280282
err = utils.VerifyTransaction(*tx)
281283
}
282284
}
283285

286+
if err != nil {
287+
// get from non-trust nodes
288+
tx, err = arCli.GetTxFromPeers(txId, peers...)
289+
if err == nil {
290+
// verify tx, ignore genesis block txs
291+
if blockHeight != 0 {
292+
err = utils.VerifyTransaction(*tx)
293+
}
294+
}
295+
}
296+
284297
if err == nil {
285298
return *tx, nil
286299
}
287300

288-
if count == 2 {
301+
if count == 5 {
289302
return types.Transaction{}, err
290303
}
291304
count++
292305
time.Sleep(2 * time.Second)
293306
}
294307
}
295308

296-
func getBlockByHeightRetry(arCli *goar.Client, height int64, blockIdxs *BlockIdxs) (*types.Block, error) {
309+
func getBlockByHeightRetry(arCli *goar.Client, height int64, blockIdxs *BlockIdxs, peers []string) (*types.Block, error) {
297310
count := 0
298311
for {
299312
b, err := arCli.GetBlockByHeight(height)
300-
if err != nil {
301-
b, err = arCli.GetBlockFromPeers(height)
302-
}
303-
304313
if err == nil {
305314
// verify block
306315
err = blockIdxs.VerifyBlock(*b)
307316
}
308317

318+
if err != nil {
319+
b, err = arCli.GetBlockFromPeers(height, peers...)
320+
if err == nil {
321+
// verify block
322+
err = blockIdxs.VerifyBlock(*b)
323+
}
324+
}
325+
309326
if err == nil {
310327
return b, nil
311328
}

0 commit comments

Comments
 (0)