Skip to content

Commit

Permalink
pg: in repl there is no fail, we get the stream or not (#1126)
Browse files Browse the repository at this point in the history
* pg: in repl there is no fail, we get the stream or not

* fix go_variables for mac
  • Loading branch information
jchappelow authored Dec 6, 2024
1 parent e1c7b3a commit a2ccd31
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 28 deletions.
4 changes: 3 additions & 1 deletion contrib/scripts/build/.go_variables
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ fi
GO_LDFLAGS="${GO_LDFLAGS:-}"

if [ "$GO_LINKMODE" = "static" ]; then
GO_LDFLAGS="$GO_LDFLAGS -extldflags=-static"
if [ "$(go env GOOS)" = "linux" ]; then
GO_LDFLAGS="$GO_LDFLAGS -extldflags=-static"
fi
# compiling statically with CGO enabled requires osusergo to be set.
# netgo is also required to avoid: "warning: Using 'getaddrinfo' in
# statically linked applications requires at runtime the shared libraries
Expand Down
19 changes: 11 additions & 8 deletions node/pg/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func captureRepl(ctx context.Context, conn *pgconn.PgConn, startLSN uint64, comm
return fmt.Errorf("ParseXLogData failed: %w", err)
}

commit, anySeq, err := decodeWALData(hasher, xld.WALData, relations, &inStream, stats, schemaFilter, writer)
final, anySeq, err := decodeWALData(hasher, xld.WALData, relations, &inStream, stats, schemaFilter, writer)
if err != nil {
return fmt.Errorf("decodeWALData failed: %w", err)
}
Expand All @@ -245,7 +245,10 @@ func captureRepl(ctx context.Context, conn *pgconn.PgConn, startLSN uint64, comm
// logger.Debugf("XLogData (in stream? %v) => WALStart %s ServerWALEnd %s\n",
// inStream, xld.WALStart, xld.ServerWALEnd)

if commit {
if final {
// This is either a commit of a regular transaction or a
// precommit (prepare transaction). In either case we have
// hashed the changeset for the entire transaction.
cHash := hasher.Sum(nil)
hasher.Reset() // hasher = sha256.New()

Expand Down Expand Up @@ -325,12 +328,14 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog
changesetWriter.WriteNewRelation(logicalMsg)

case *pglogrepl.BeginMessage:
// This is a regular transaction commit, not a prepared transaction.
logger.Debugf(" [msg] Begin: LSN %v (%d)", logicalMsg.FinalLSN, uint64(logicalMsg.FinalLSN))
// Indicates the beginning of a group of changes in a transaction. This
// is only sent for committed transactions. You won't get any events
// from rolled back transactions.

case *pglogrepl.CommitMessage:
// This is a regular transaction commit, not a prepared transaction.
logger.Debugf(" [msg] Commit: Commit LSN %v (%d), End LSN %v (%d)",
logicalMsg.CommitLSN, uint64(logicalMsg.CommitLSN),
logicalMsg.TransactionEndLSN, uint64(logicalMsg.TransactionEndLSN))
Expand Down Expand Up @@ -480,10 +485,7 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog
// * msgs: Commit Prepared (NO regular "Commit" message)
done = true // there will be a commit or a rollback, but this is the end of the update stream

err = changesetWriter.commit()
if err != nil {
return false, 0, fmt.Errorf("changeset commit error: %w", err)
}
changesetWriter.finalize()

case *CommitPreparedMessageV3:
logger.Debugf(" [msg] COMMIT PREPARED TRANSACTION (id %v): Commit LSN %v (%d), End LSN %v (%d) \n",
Expand All @@ -498,8 +500,9 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog
logicalMsg.UserGID, logicalMsg.RollbackLSN, uint64(logicalMsg.RollbackLSN),
logicalMsg.EndLSN, uint64(logicalMsg.EndLSN))

hasher.Reset()
changesetWriter.fail() // discard changeset
// ROLLBACK PREPARED would happen after PREPARE transaction, which is
// where we finalized the changeset. The caller that aborted will simply
// discard the changeset hash that they already received.

// v2 Stream control messages. Only expected with large transactions.
case *pglogrepl.StreamStartMessageV2:
Expand Down
21 changes: 2 additions & 19 deletions node/pg/repl_changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,25 +557,7 @@ func (c *changesetIoWriter) decodeDelete(delete *pglogrepl.DeleteMessageV2, rela
// It exports the metadata to the writer.
// It zeroes the metadata, so that the changeset can be reused,
// and send a finish signal to the writer.
func (c *changesetIoWriter) commit() error {
if c.csChan == nil {
return nil
}
// clear the relation index list for the next block
c.metadata = &changesetMetadata{
relationIdx: map[[2]string]int{},
}

// close the changes chan to signal the end of the changeset
close(c.csChan)
c.csChan = nil

return nil
}

// fail is called when the changeset is incomplete.
// It zeroes the metadata and writer, so that another changeset may be collected.
func (c *changesetIoWriter) fail() {
func (c *changesetIoWriter) finalize() {
if c.csChan == nil {
return
}
Expand All @@ -584,6 +566,7 @@ func (c *changesetIoWriter) fail() {
relationIdx: map[[2]string]int{},
}

// close the changes chan to signal the end of the changeset
close(c.csChan)
c.csChan = nil
}
Expand Down

0 comments on commit a2ccd31

Please sign in to comment.