@@ -24,6 +24,9 @@ type Envelope struct {
24
24
type MessageRow struct {
25
25
MsgID int64 `gorm:"not null,primaryKey,column:msg_id"`
26
26
Message pgtype.JSONB `gorm:"type:jsonb"`
27
+ ReadCount int `gorm:"not null,column:read_ct"`
28
+ // Visibility timeout
29
+ VT pgtype.Timestamptz `gorm:"not null,column:vt"`
27
30
}
28
31
29
32
type MetaRow struct {
@@ -98,13 +101,12 @@ func toMessage(tenantId int64, in *pgmq.Message) (*models.Message, error) {
98
101
}
99
102
100
103
return & models.Message {
101
- ID : in .MsgID ,
102
- TenantID : tenantId ,
103
- //QueueID: message.QueueID,
104
- //DeliverAt: int(message.DeliverAt),
105
- //DeliveredAt: int(message.DeliveredAt),
106
- //Tries: message.Tries,
107
- //MaxTries: message.MaxTries,
104
+ ID : in .MsgID ,
105
+ TenantID : tenantId ,
106
+ DeliverAt : int (in .VT .Unix ()),
107
+ Tries : int (in .ReadCount ),
108
+ // pgmq has no max retry count; it's always n + 1 here.
109
+ MaxTries : int (in .ReadCount ) + 1 ,
108
110
Message : []byte (envelope .Body ),
109
111
KeyValues : envelope .Headers ,
110
112
}, nil
@@ -118,14 +120,13 @@ func rowToMessage(tenantId int64, in *MessageRow) (*models.Message, error) {
118
120
}
119
121
120
122
return & models.Message {
121
- ID : in .MsgID ,
122
- TenantID : tenantId ,
123
- //QueueID: message.QueueID,
124
- //DeliverAt: int(message.DeliverAt),
125
- //DeliveredAt: int(message.DeliveredAt),
126
- //Tries: message.Tries,
127
- //MaxTries: message.MaxTries,
128
- Message : []byte (envelope .Body ),
123
+ ID : in .MsgID ,
124
+ TenantID : tenantId ,
125
+ DeliverAt : int (in .VT .Time .Unix ()),
126
+ Tries : int (in .ReadCount ),
127
+ // pgmq has no max retry count; it's always n + 1 here.
128
+ MaxTries : int (in .ReadCount ) + 1 ,
129
+ Message : []byte (envelope .Body ),
129
130
KeyValues : envelope .Headers ,
130
131
}, nil
131
132
}
0 commit comments