Skip to content

Commit

Permalink
feat(chain_stream): relevant events are collected in a channel in ord…
Browse files Browse the repository at this point in the history
…er to be consumed in app

- optional addition to run in binary
- performance impact not too relevant as most wont run this
  • Loading branch information
gorgos committed Jan 15, 2025
1 parent 4b225ca commit 78a5641
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
13 changes: 13 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ type BaseApp struct {
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution

// StreamEvents
EnableStreamer bool
StreamEvents chan StreamEvents
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand All @@ -208,6 +212,7 @@ func NewBaseApp(
fauxMerkleMode: false,
sigverifyTx: true,
queryGasLimit: math.MaxUint64,
StreamEvents: make(chan StreamEvents),
}

for _, option := range options {
Expand Down Expand Up @@ -747,6 +752,8 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er
)
}

ctx := app.finalizeBlockState.ctx
app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), resp.Events, false)
resp.Events = sdk.MarkEventsToIndex(resp.Events, app.indexEvents)
}

Expand Down Expand Up @@ -779,6 +786,9 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
return resp
}

ctx := app.checkState.Context()
app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), result.Events, false)

resp = &abci.ExecTxResult{
GasWanted: int64(gInfo.GasWanted),
GasUsed: int64(gInfo.GasUsed),
Expand Down Expand Up @@ -810,6 +820,9 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
)
}

ctx := app.finalizeBlockState.ctx
app.AddStreamEvents(ctx.BlockHeight(), ctx.BlockTime(), eb.Events, true)

eb.Events = sdk.MarkEventsToIndex(eb.Events, app.indexEvents)
endblock = eb
}
Expand Down
25 changes: 25 additions & 0 deletions baseapp/chain_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package baseapp

import (
"time"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
)

type StreamEvents struct {
Events []abci.Event
Height uint64
BlockTime time.Time
Flush bool
}

func (app *BaseApp) AddStreamEvents(height int64, blockTime time.Time, events []abci.Event, flush bool) {
if app.EnableStreamer {
app.StreamEvents <- StreamEvents{
Events: events,
Height: uint64(height),
BlockTime: blockTime,
Flush: flush,
}
}
}

0 comments on commit 78a5641

Please sign in to comment.