diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 0c6ee0420614..fad4215dcf6c 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -188,6 +188,10 @@ type BaseApp struct { // including the goroutine handling.This is experimental and must be enabled // by developers. optimisticExec *oe.OptimisticExecution + + // StreamEvents + EnableStreamer bool + StreamEvents chan StreamEvents } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -208,6 +212,7 @@ func NewBaseApp( fauxMerkleMode: false, sigverifyTx: true, queryGasLimit: math.MaxUint64, + StreamEvents: make(chan StreamEvents), } for _, option := range options { @@ -747,6 +752,8 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er ) } + ctx := app.finalizeBlockState.ctx + app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), resp.Events, false) resp.Events = sdk.MarkEventsToIndex(resp.Events, app.indexEvents) } @@ -779,6 +786,9 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { return resp } + ctx := app.checkState.Context() + app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), result.Events, false) + resp = &abci.ExecTxResult{ GasWanted: int64(gInfo.GasWanted), GasUsed: int64(gInfo.GasUsed), @@ -810,6 +820,9 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { ) } + ctx := app.finalizeBlockState.ctx + app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), eb.Events, true) + eb.Events = sdk.MarkEventsToIndex(eb.Events, app.indexEvents) endblock = eb } diff --git a/baseapp/chain_stream.go b/baseapp/chain_stream.go new file mode 100644 index 000000000000..9e1763945210 --- /dev/null +++ b/baseapp/chain_stream.go @@ -0,0 +1,25 @@ +package baseapp + +import ( + "time" + + abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" +) + +type StreamEvents struct { + Events []abci.Event + Height uint64 + BlockTime time.Time + Flush bool +} + +func (app *BaseApp) AddStreamEvents(height int64, blockTime time.Time, events []abci.Event, flush bool) { + if app.EnableStreamer { + app.StreamEvents <- StreamEvents{ + Events: events, + Height: uint64(height), + BlockTime: blockTime, + Flush: flush, + } + } +}