From 78a564199978cf0f365d1069574ab49f75b2b4a5 Mon Sep 17 00:00:00 2001 From: Markus Waas Date: Wed, 15 Jan 2025 14:02:50 +1300 Subject: [PATCH] feat(chain_stream): relevant events are collected in a channel in order to be consumed in app - optional addition to run in binary - performance impact not too relevant as most wont run this --- baseapp/baseapp.go | 13 +++++++++++++ baseapp/chain_stream.go | 25 +++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 baseapp/chain_stream.go diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 0c6ee0420614..fad4215dcf6c 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -188,6 +188,10 @@ type BaseApp struct { // including the goroutine handling.This is experimental and must be enabled // by developers. optimisticExec *oe.OptimisticExecution + + // StreamEvents + EnableStreamer bool + StreamEvents chan StreamEvents } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -208,6 +212,7 @@ func NewBaseApp( fauxMerkleMode: false, sigverifyTx: true, queryGasLimit: math.MaxUint64, + StreamEvents: make(chan StreamEvents), } for _, option := range options { @@ -747,6 +752,8 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er ) } + ctx := app.finalizeBlockState.ctx + app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), resp.Events, false) resp.Events = sdk.MarkEventsToIndex(resp.Events, app.indexEvents) } @@ -779,6 +786,9 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { return resp } + ctx := app.checkState.Context() + app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), result.Events, false) + resp = &abci.ExecTxResult{ GasWanted: int64(gInfo.GasWanted), GasUsed: int64(gInfo.GasUsed), @@ -810,6 +820,9 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { ) } + ctx := app.finalizeBlockState.ctx + app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), eb.Events, true) + eb.Events = sdk.MarkEventsToIndex(eb.Events, app.indexEvents) endblock = eb } diff --git a/baseapp/chain_stream.go b/baseapp/chain_stream.go new file mode 100644 index 000000000000..9e1763945210 --- /dev/null +++ b/baseapp/chain_stream.go @@ -0,0 +1,25 @@ +package baseapp + +import ( + "time" + + abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" +) + +type StreamEvents struct { + Events []abci.Event + Height uint64 + BlockTime time.Time + Flush bool +} + +func (app *BaseApp) AddStreamEvents(height int64, blockTime time.Time, events []abci.Event, flush bool) { + if app.EnableStreamer { + app.StreamEvents <- StreamEvents{ + Events: events, + Height: uint64(height), + BlockTime: blockTime, + Flush: flush, + } + } +}