Skip to content

Commit

Permalink
clean up prepared transactions in (*DB).Close
Browse files Browse the repository at this point in the history
* clean up prepared transactions in (*DB).Close

* enable pg logger

* update cleanup in node_live_test.go too

* plain log for pg pkg in tests
  • Loading branch information
jchappelow authored Dec 6, 2024
1 parent 022e5cc commit e1c7b3a
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ tasks:
cmds:
- go test ./core/... -tags=ext_test -count=1 -race
- CGO_ENABLED=1 go test ./parse/... -tags=ext_test -count=1 -race
- CGO_ENABLED=1 go test ./... -tags=ext_test -count=1 -race
- CGO_ENABLED=1 go test ./... -tags=ext_test,pglive -count=1 -race

# test:it:
# desc: Run integration tests ('short' mode)
Expand Down
2 changes: 2 additions & 0 deletions app/node/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (mt *mysteryThing) Price(ctx context.Context, db sql.DB, tx *types.Transact
}

func buildDB(ctx context.Context, d *coreDependencies, closers *closeFuncs) *pg.DB {
pg.UseLogger(d.logger.New("PG"))

// TODO: restore from snapshots

db, err := d.dbOpener(ctx, d.cfg.DB.DBName, d.cfg.DB.MaxConns)
Expand Down
5 changes: 5 additions & 0 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ func verifyStatus(_ *testing.T, val *ConsensusEngine, status Status, height int6
return nil
}

func TestMain(m *testing.M) {
pg.UseLogger(log.New(log.WithName("DBS"), log.WithFormat(log.FormatUnstructured)))
m.Run()
}

func TestValidatorStateMachine(t *testing.T) {
// t.Parallel()
type action struct {
Expand Down
14 changes: 7 additions & 7 deletions node/node_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ var defaultGenesisParams = &consensus.GenesisParams{
},
}

func TestMain(m *testing.M) {
pg.UseLogger(log.New(log.WithName("DBS"), log.WithFormat(log.FormatUnstructured)))
m.Run()
}

func TestSingleNodeMocknet(t *testing.T) {
if testing.Short() {
t.Skip()
Expand All @@ -54,8 +59,6 @@ func TestSingleNodeMocknet(t *testing.T) {
bs1 := memstore.NewMemBS()
mp1 := mempool.New()

pg.UseLogger(log.New(log.WithName("DBS")))

db1 := initDB(t, "5432", "kwil_test_db")

root1 := t.TempDir()
Expand Down Expand Up @@ -155,8 +158,6 @@ func TestDualNodeMocknet(t *testing.T) {
bs1 := memstore.NewMemBS()
mp1 := mempool.New()

pg.UseLogger(log.New(log.WithName("DBS")))

db1 := initDB(t, "5432", "kwil_test_db")
func() {
ctx := context.Background()
Expand Down Expand Up @@ -332,10 +333,9 @@ func initDB(t *testing.T, port, dbName string) *pg.DB {
}

func cleanupDB(db *pg.DB) {
ctx := context.Background()
db.AutoCommit(true)
defer db.AutoCommit(false)
defer db.Close()
db.AutoCommit(true)
ctx := context.Background()
db.Execute(ctx, `DROP SCHEMA IF EXISTS kwild_chain CASCADE;`)
db.Execute(ctx, `DROP SCHEMA IF EXISTS kwild_internal CASCADE;`)
}
Expand Down
35 changes: 26 additions & 9 deletions node/pg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,17 +266,34 @@ func (db *DB) EnsureFullReplicaIdentityDatasets(ctx context.Context) error {
func (db *DB) Close() error {
db.cancel(nil)
db.repl.stop()
if db.tx != nil {
// This is a bug, so we are going to panic so the issue is not easily
// ignored, but we will rollback the tx so we don't hang or leak
// postgresql resources.
db.tx.Rollback(context.Background())
db.tx = nil
if db.tx == nil {
return db.pool.Close()
}

// This is a bug, so we are going to panic so the issue is not easily
// ignored, but we will rollback the tx so we don't hang or leak
// postgresql resources.

if db.txid != "" {
logger.Warnln("Rolling back PREPARED transaction", db.txid)
// With PREPARE TRANSACTION already done, rollback the prepared transaction.
sqlRollback := fmt.Sprintf(`ROLLBACK PREPARED '%s'`, db.txid)
if _, err := db.tx.Exec(context.Background(), sqlRollback); err != nil {
return fmt.Errorf("ROLLBACK PREPARED failed: %v", err)
}
// We don't use Rollback, which normally releases automatically.
if rel, ok := db.tx.(releaser); ok {
rel.Release()
}
db.txid = ""
db.pool.Close()
panic("Closed the DB with an active transaction!")
} else { // otherwise regular rollback
logger.Warnln("Rolling back regular transaction")
db.tx.Rollback(context.Background())
}
return db.pool.Close()

db.tx = nil
db.pool.Close()
panic("Closed the DB with an active transaction!")
}

// Done allows higher level systems to monitor the state of the DB backend
Expand Down

0 comments on commit e1c7b3a

Please sign in to comment.