diff --git a/cmd/gero/main.go b/cmd/gero/main.go index 4b1851d6..db266c7b 100644 --- a/cmd/gero/main.go +++ b/cmd/gero/main.go @@ -119,6 +119,7 @@ var ( utils.ConfirmedBlockFlag, utils.RecordBlockShareNumber, utils.LightNodeFlag, + utils.CloseAcceptTx, utils.ResetBlockNumber, utils.DeveloperFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 917c50b0..a13309d9 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -340,6 +340,11 @@ var ( Usage: "start light node", } + CloseAcceptTx = cli.BoolFlag{ + Name: "closeAcceptTx", + Usage: "Close Accept from remote Tx", + } + ConfirmedBlockFlag = cli.Uint64Flag{ Name: "confirmedBlock", Usage: "The balance will be confirmed after the current block of number,default is 12", @@ -1289,6 +1294,10 @@ func SetSeroConfig(ctx *cli.Context, stack *node.Node, cfg *sero.Config) { cfg.StartLight = true } + if ctx.GlobalIsSet(CloseAcceptTx.Name) { + cfg.CloseAcceptTx = true + } + // Override any default configs for hard coded networks. switch { case ctx.GlobalBool(AlphanetFlag.Name): diff --git a/miner/worker.go b/miner/worker.go index 8ca02952..4165fef6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -636,7 +636,7 @@ LOOP: err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) switch err { case core.ErrGasLimitReached: - log.Info("Gas limit exceeded for current block", "block", bc.CurrentBlock().Header().Number.Uint64(), "txHash", tx.Hash().Hex()) + log.Info("Gas limit exceeded for current block", "block", bc.CurrentBlock().Header().Number.Uint64()) // Pop the current out-of-gas transaction without shifting in the next from the account //log.Trace("Gas limit exceeded for current block", "sender", tx.From()) txs.Pop() diff --git a/sero/backend.go b/sero/backend.go index e9df72d9..952d6361 100644 --- a/sero/backend.go +++ b/sero/backend.go @@ -192,7 +192,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Sero, error) { sero.miner = miner.New(sero, sero.chainConfig, sero.EventMux(), sero.voter, sero.engine) - if sero.protocolManager, err = NewProtocolManager(sero.chainConfig, config.SyncMode, config.NetworkId, sero.eventMux, sero.voter, sero.txPool, sero.miner, sero.engine, sero.blockchain, chainDb); err != nil { + if sero.protocolManager, err = NewProtocolManager(sero.chainConfig, config.CloseAcceptTx, config.SyncMode, config.NetworkId, sero.eventMux, sero.voter, sero.txPool, sero.miner, sero.engine, sero.blockchain, chainDb); err != nil { return nil, err } sero.miner.SetExtra(makeExtraData(config.ExtraData)) diff --git a/sero/config.go b/sero/config.go index 471e8571..57920a16 100644 --- a/sero/config.go +++ b/sero/config.go @@ -84,6 +84,7 @@ type Config struct { SyncMode downloader.SyncMode NoPruning bool + CloseAcceptTx bool MineMode bool StartExchange bool AutoMerge bool diff --git a/sero/handler.go b/sero/handler.go index dc06c09d..22d3ecb8 100644 --- a/sero/handler.go +++ b/sero/handler.go @@ -65,15 +65,15 @@ func errResp(code errCode, format string, v ...interface{}) error { type ProtocolManager struct { networkID uint64 - fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) - acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) - - txpool txPool - miner sero_miner - voter shareVoter - blockchain *core.BlockChain - chainconfig *params.ChainConfig - maxPeers int + fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) + acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + closeAcceptTx bool + txpool txPool + miner sero_miner + voter shareVoter + blockchain *core.BlockChain + chainconfig *params.ChainConfig + maxPeers int downloader *downloader.Downloader fetcher *fetcher.Fetcher @@ -103,21 +103,22 @@ type ProtocolManager struct { // NewProtocolManager returns a new Sero sub protocol manager. The Sero sub protocol manages peers capable // with the Sero network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, voter shareVoter, txpool txPool, miner sero_miner, engine consensus.Engine, blockchain *core.BlockChain, chaindb serodb.Database) (*ProtocolManager, error) { +func NewProtocolManager(config *params.ChainConfig, closeAcceptTx bool, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, voter shareVoter, txpool txPool, miner sero_miner, engine consensus.Engine, blockchain *core.BlockChain, chaindb serodb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - miner: miner, - voter: voter, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: networkID, + eventMux: mux, + closeAcceptTx: closeAcceptTx, + txpool: txpool, + miner: miner, + voter: voter, + blockchain: blockchain, + chainconfig: config, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { @@ -630,10 +631,20 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == TxMsg: + if pm.closeAcceptTx { + break + } // Transactions arrived, make sure we have a valid and fresh chain to handle them if atomic.LoadUint32(&pm.acceptTxs) == 0 { break } + currentBlock := pm.blockchain.CurrentBlock() + difference := time.Now().Unix() - currentBlock.Time().Int64() + if difference > 2*60 { + log.Info("to behind,dont receive remote txs") + break + } + // Transactions can be processed, parse all of them and deliver to the pool var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { @@ -647,16 +658,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.peers.AddKnowTx(p.id, tx.Hash()) //p.MarkTransaction(tx.Hash()) } - currentBlock := pm.blockchain.CurrentBlock() - difference := time.Now().Unix() - currentBlock.Time().Int64() - if difference < 10*60 { - errs := pm.txpool.AddRemotes(txs) - addedTxs := len(txs) - len(errs) - if addedTxs > 0 { - log.Debug("received from", "remote peer", p.RemoteAddr().String(), "txs", len(txs), "added", addedTxs) - } - } else { - log.Trace("to behind,dont receive remote txs") + + errs := pm.txpool.AddRemotes(txs) + addedTxs := len(txs) - len(errs) + if addedTxs > 0 { + log.Debug("received from", "remote peer", p.RemoteAddr().String(), "txs", len(txs), "added", addedTxs) } case msg.Code == NewVoteMsg: @@ -731,6 +737,13 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) } + + currentBlock := pm.blockchain.CurrentBlock() + difference := time.Now().Unix() - currentBlock.Time().Int64() + if difference > 2*60 { + return + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for peer, txs := range txset { diff --git a/sero/peer.go b/sero/peer.go index d2dcf28b..6ad47fa6 100644 --- a/sero/peer.go +++ b/sero/peer.go @@ -243,7 +243,7 @@ func (p *peer) SendTransactions(txs types.Transactions) error { // p.knownTxs.Add(tx.Hash()) //} if len(txs) > 0 { - subLen := 400 + subLen := 200 start := 0 for { if start >= len(txs) {