Skip to content

Commit 2730c37

Browse files
committed
pgmq: Implement Stats()
1 parent 49c8284 commit 2730c37

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

models/queue.go

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Queue interface {
4343

4444
// TODO: Peek should include an error on return
4545
Peek(tenantId int64, queue string, messageId int64) *Message
46+
// TODO: Status should include an error on return
4647
Stats(tenantId int64, queue string) QueueStats
4748
Filter(tenantId int64, queue string, filterCriteria FilterCriteria) []int64
4849

queue/pgmq/pgmq.go

+34-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ func parseTenantQueueName(queueName string) (int64, string, error) {
8181

8282
// Returns the name of the underlying pqmq table
8383
func buildTenantQueueTableName(tenantId int64, queueName string) string {
84-
safeQueueName := base64.URLEncoding.EncodeToString([]byte(queueName))
84+
// pgmq does not quote table names in its implementation,
85+
// so table names are not case sensitive.
86+
// It is possible for two table names to collide with the same
87+
// case-insensitive base64 encoding.
88+
safeQueueName := strings.ToLower(base64Encoder.EncodeToString([]byte(queueName)))
8589
return fmt.Sprintf("pgmq.\"q_tnt_%x_%s\"", uint64(tenantId), safeQueueName)
8690
}
8791

@@ -226,7 +230,35 @@ func (q *PGMQQueue) Peek(tenantId int64, queue string, messageId int64) *models.
226230
}
227231

228232
func (q *PGMQQueue) Stats(tenantId int64, queue string) models.QueueStats {
229-
stats := models.QueueStats{}
233+
stats := models.QueueStats{
234+
Counts: make(map[models.MessageStatus]int),
235+
TotalMessages: 0,
236+
}
237+
238+
table := buildTenantQueueTableName(tenantId, queue)
239+
sql := fmt.Sprintf(`
240+
SELECT
241+
CASE
242+
WHEN read_ct > 0 AND vt < CURRENT_TIMESTAMP THEN 2
243+
ELSE 1
244+
END AS s, count(*) FROM %s GROUP BY s
245+
`, table)
246+
res := q.DB.Raw(sql)
247+
rows, err := res.Rows()
248+
249+
if err != nil {
250+
return stats
251+
}
252+
253+
for rows.Next() {
254+
var statusType models.MessageStatus
255+
var count int
256+
rows.Scan(&statusType, &count)
257+
stats.TotalMessages += count
258+
stats.Counts[statusType] = count
259+
}
260+
rows.Close()
261+
230262
return stats
231263
}
232264

0 commit comments

Comments
 (0)