Skip to content

Commit 7096e86

Browse files
committed
pgmq: Provide message metadata (Tries, DeliverAt, MaxTries)
1 parent d64ed15 commit 7096e86

File tree

2 files changed

+18
-15
lines changed

2 files changed

+18
-15
lines changed

models/message.go

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type Message struct {
3535
TenantID int64 `db:"tenant_id"`
3636
QueueID int64 `db:"queue_id"`
3737

38+
// FIXME: DeliverAt and DeliveredAt should use int64
39+
// for Unix timestamps to prevent overflow in 2038
3840
DeliverAt int `db:"deliver_at"`
3941
DeliveredAt int `db:"delivered_at"`
4042
Tries int `db:"tries"`

queue/pgmq/pgmq.go

+16-15
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type Envelope struct {
2424
type MessageRow struct {
2525
MsgID int64 `gorm:"not null,primaryKey,column:msg_id"`
2626
Message pgtype.JSONB `gorm:"type:jsonb"`
27+
ReadCount int `gorm:"not null,column:read_ct"`
28+
// Visibility timeout
29+
VT pgtype.Timestamptz `gorm:"not null,column:vt"`
2730
}
2831

2932
type MetaRow struct {
@@ -98,13 +101,12 @@ func toMessage(tenantId int64, in *pgmq.Message) (*models.Message, error) {
98101
}
99102

100103
return &models.Message {
101-
ID: in.MsgID,
102-
TenantID: tenantId,
103-
//QueueID: message.QueueID,
104-
//DeliverAt: int(message.DeliverAt),
105-
//DeliveredAt: int(message.DeliveredAt),
106-
//Tries: message.Tries,
107-
//MaxTries: message.MaxTries,
104+
ID: in.MsgID,
105+
TenantID: tenantId,
106+
DeliverAt: int(in.VT.Unix()),
107+
Tries: int(in.ReadCount),
108+
// pgmq has no max retry count; it's always n + 1 here.
109+
MaxTries: int(in.ReadCount) + 1,
108110
Message: []byte(envelope.Body),
109111
KeyValues: envelope.Headers,
110112
}, nil
@@ -118,14 +120,13 @@ func rowToMessage(tenantId int64, in *MessageRow) (*models.Message, error) {
118120
}
119121

120122
return &models.Message {
121-
ID: in.MsgID,
122-
TenantID: tenantId,
123-
//QueueID: message.QueueID,
124-
//DeliverAt: int(message.DeliverAt),
125-
//DeliveredAt: int(message.DeliveredAt),
126-
//Tries: message.Tries,
127-
//MaxTries: message.MaxTries,
128-
Message: []byte(envelope.Body),
123+
ID: in.MsgID,
124+
TenantID: tenantId,
125+
DeliverAt: int(in.VT.Time.Unix()),
126+
Tries: int(in.ReadCount),
127+
// pgmq has no max retry count; it's always n + 1 here.
128+
MaxTries: int(in.ReadCount) + 1,
129+
Message: []byte(envelope.Body),
129130
KeyValues: envelope.Headers,
130131
}, nil
131132
}

0 commit comments

Comments
 (0)