Skip to content

Commit 7288748

Browse files
Merge pull request #7 from everFinance/feature/subscibe-blocks
add block subscribe
2 parents 0b14ca5 + a2d8303 commit 7288748

6 files changed

+32
-10
lines changed

example/all-tx-syncer.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
56
"github.com/everFinance/arsyncer"
67
)
78

@@ -11,7 +12,7 @@ func main() {
1112
startHeight := int64(879220)
1213
arNode := "https://arweave.net"
1314
concurrencyNumber := 10 // runtime concurrency number, default 10
14-
s := arsyncer.New(startHeight, nullFilterParams, arNode, concurrencyNumber, 15)
15+
s := arsyncer.New(startHeight, nullFilterParams, arNode, concurrencyNumber, 15, true)
1516

1617
// run
1718
s.Run()
@@ -21,7 +22,10 @@ func main() {
2122
select {
2223
case sTx := <-s.SubscribeTxCh():
2324
// process synced txs
24-
fmt.Println(sTx[0].BlockHeight)
25+
fmt.Println("Tx", sTx[0].ID)
26+
case sBlock := <-s.SubscribeBlockCh():
27+
fmt.Println("Block", sBlock.Height, sBlock.IndepHash)
2528
}
29+
2630
}
2731
}

example/from-tx-syncer.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
56
"github.com/everFinance/arsyncer"
67
)
78

@@ -14,7 +15,7 @@ func main() {
1415
startHeight := int64(804524)
1516
arNode := "https://arweave.net"
1617
concurrencyNumber := 100 // runtime concurrency number, default 10
17-
s := arsyncer.New(startHeight, ownerFilterParams, arNode, concurrencyNumber, 15)
18+
s := arsyncer.New(startHeight, ownerFilterParams, arNode, concurrencyNumber, 15, false)
1819

1920
// run
2021
s.Run()

example/swc-syncer.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
56
"github.com/everFinance/arsyncer"
67
"github.com/everFinance/goar/types"
78
)
@@ -16,7 +17,7 @@ func main() {
1617
startHeight := int64(472810)
1718
arNode := "https://arweave.net"
1819
concurrencyNumber := 50 // runtime concurrency number, default 10
19-
s := arsyncer.New(startHeight, swcFilterParams, arNode, concurrencyNumber, 15)
20+
s := arsyncer.New(startHeight, swcFilterParams, arNode, concurrencyNumber, 15, false)
2021

2122
// run
2223
s.Run()

example/target-tx-syncer.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
56
"github.com/everFinance/arsyncer"
67
)
78

@@ -14,7 +15,7 @@ func main() {
1415
startHeight := int64(811484)
1516
arNode := "https://arweave.net"
1617
concurrencyNumber := 100 // runtime concurrency number, default 10
17-
s := arsyncer.New(startHeight, ownerFilterParams, arNode, concurrencyNumber, 15)
18+
s := arsyncer.New(startHeight, ownerFilterParams, arNode, concurrencyNumber, 15, false)
1819

1920
// run
2021
s.Run()

go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ require (
77
github.com/getsentry/sentry-go v0.11.0
88
github.com/go-co-op/gocron v1.11.0
99
github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac
10+
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 // indirect
1011
github.com/panjf2000/ants/v2 v2.4.6
1112
github.com/pkg/errors v0.9.1 // indirect
13+
github.com/stretchr/testify v1.7.0
1214
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
13-
gopkg.in/h2non/gentleman.v2 v2.0.5 // indirect
15+
gopkg.in/h2non/gentleman.v2 v2.0.5
1416
)

syncer.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package arsyncer
22

33
import (
44
"fmt"
5+
"sync"
6+
"sync/atomic"
7+
"time"
8+
59
"github.com/everFinance/goar"
610
"github.com/everFinance/goar/types"
711
"github.com/everFinance/goar/utils"
812
"github.com/go-co-op/gocron"
913
"github.com/panjf2000/ants/v2"
10-
"sync"
11-
"sync/atomic"
12-
"time"
1314
)
1415

1516
var log = NewLog("syncer")
@@ -27,9 +28,12 @@ type Syncer struct {
2728
blockIdxs *BlockIdxs
2829
scheduler *gocron.Scheduler
2930
peers []string
31+
32+
subscribeBlock bool
33+
SubscribeBlockChan chan *types.Block
3034
}
3135

32-
func New(startHeight int64, filterParams FilterParams, arNode string, conNum int, stableDistance int64) *Syncer {
36+
func New(startHeight int64, filterParams FilterParams, arNode string, conNum int, stableDistance int64, subscribeBlock bool) *Syncer {
3337
if conNum <= 0 {
3438
conNum = 10 // default concurrency of number is 10
3539
}
@@ -54,6 +58,7 @@ func New(startHeight int64, filterParams FilterParams, arNode string, conNum int
5458
curHeight: startHeight,
5559
FilterParams: filterParams,
5660
blockChan: make(chan *types.Block, 5*conNum),
61+
SubscribeBlockChan: make(chan *types.Block, 5*conNum),
5762
blockTxsChan: make(chan []SubscribeTx, conNum),
5863
SubscribeChan: make(chan []SubscribeTx, conNum),
5964
arClient: arCli,
@@ -63,6 +68,7 @@ func New(startHeight int64, filterParams FilterParams, arNode string, conNum int
6368
blockIdxs: idxs,
6469
scheduler: gocron.NewScheduler(time.UTC),
6570
peers: peers,
71+
subscribeBlock: subscribeBlock,
6672
}
6773
}
6874

@@ -84,6 +90,10 @@ func (s *Syncer) SubscribeTxCh() <-chan []SubscribeTx {
8490
return s.SubscribeChan
8591
}
8692

93+
func (s *Syncer) SubscribeBlockCh() <-chan *types.Block {
94+
return s.SubscribeBlockChan
95+
}
96+
8797
func (s *Syncer) GetSyncedHeight() int64 {
8898
return atomic.LoadInt64(&s.nextSubscribeTxBlock)
8999
}
@@ -118,6 +128,9 @@ func (s *Syncer) pollingBlock() {
118128
// add chan
119129
for _, b := range blocks {
120130
s.blockChan <- b
131+
if s.subscribeBlock {
132+
s.SubscribeBlockChan <- b
133+
}
121134
}
122135
}
123136
}

0 commit comments

Comments
 (0)