diff --git a/node/pg/repl.go b/node/pg/repl.go index 25ff4fd0b..9c0c34d85 100644 --- a/node/pg/repl.go +++ b/node/pg/repl.go @@ -225,7 +225,7 @@ func captureRepl(ctx context.Context, conn *pgconn.PgConn, startLSN uint64, comm return fmt.Errorf("ParseXLogData failed: %w", err) } - commit, anySeq, err := decodeWALData(hasher, xld.WALData, relations, &inStream, stats, schemaFilter, writer) + final, anySeq, err := decodeWALData(hasher, xld.WALData, relations, &inStream, stats, schemaFilter, writer) if err != nil { return fmt.Errorf("decodeWALData failed: %w", err) } @@ -245,7 +245,10 @@ func captureRepl(ctx context.Context, conn *pgconn.PgConn, startLSN uint64, comm // logger.Debugf("XLogData (in stream? %v) => WALStart %s ServerWALEnd %s\n", // inStream, xld.WALStart, xld.ServerWALEnd) - if commit { + if final { + // This is either a commit of a regular transaction or a + // precommit (prepare transaction). In either case we have + // hashed the changeset for the entire transaction. cHash := hasher.Sum(nil) hasher.Reset() // hasher = sha256.New() @@ -325,12 +328,14 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog changesetWriter.WriteNewRelation(logicalMsg) case *pglogrepl.BeginMessage: + // This is a regular transaction commit, not a prepared transaction. logger.Debugf(" [msg] Begin: LSN %v (%d)", logicalMsg.FinalLSN, uint64(logicalMsg.FinalLSN)) // Indicates the beginning of a group of changes in a transaction. This // is only sent for committed transactions. You won't get any events // from rolled back transactions. case *pglogrepl.CommitMessage: + // This is a regular transaction commit, not a prepared transaction. logger.Debugf(" [msg] Commit: Commit LSN %v (%d), End LSN %v (%d)", logicalMsg.CommitLSN, uint64(logicalMsg.CommitLSN), logicalMsg.TransactionEndLSN, uint64(logicalMsg.TransactionEndLSN)) @@ -480,10 +485,7 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog // * msgs: Commit Prepared (NO regular "Commit" message) done = true // there will be a commit or a rollback, but this is the end of the update stream - err = changesetWriter.commit() - if err != nil { - return false, 0, fmt.Errorf("changeset commit error: %w", err) - } + changesetWriter.finalize() case *CommitPreparedMessageV3: logger.Debugf(" [msg] COMMIT PREPARED TRANSACTION (id %v): Commit LSN %v (%d), End LSN %v (%d) \n", @@ -498,8 +500,9 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog logicalMsg.UserGID, logicalMsg.RollbackLSN, uint64(logicalMsg.RollbackLSN), logicalMsg.EndLSN, uint64(logicalMsg.EndLSN)) - hasher.Reset() - changesetWriter.fail() // discard changeset + // ROLLBACK PREPARED would happen after PREPARE transaction, which is + // where we finalized the changeset. The that aborted will simply + // discard the changeset hash that they already received. // v2 Stream control messages. Only expected with large transactions. case *pglogrepl.StreamStartMessageV2: diff --git a/node/pg/repl_changeset.go b/node/pg/repl_changeset.go index 439179e12..b3d59d841 100644 --- a/node/pg/repl_changeset.go +++ b/node/pg/repl_changeset.go @@ -557,25 +557,7 @@ func (c *changesetIoWriter) decodeDelete(delete *pglogrepl.DeleteMessageV2, rela // It exports the metadata to the writer. // It zeroes the metadata, so that the changeset can be reused, // and send a finish signal to the writer. -func (c *changesetIoWriter) commit() error { - if c.csChan == nil { - return nil - } - // clear the relation index list for the next block - c.metadata = &changesetMetadata{ - relationIdx: map[[2]string]int{}, - } - - // close the changes chan to signal the end of the changeset - close(c.csChan) - c.csChan = nil - - return nil -} - -// fail is called when the changeset is incomplete. -// It zeroes the metadata and writer, so that another changeset may be collected. -func (c *changesetIoWriter) fail() { +func (c *changesetIoWriter) finalize() { if c.csChan == nil { return } @@ -584,6 +566,7 @@ func (c *changesetIoWriter) fail() { relationIdx: map[[2]string]int{}, } + // close the changes chan to signal the end of the changeset close(c.csChan) c.csChan = nil }