Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PGMQ driver implementation #38

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
pgmq: Implement Stats()
  • Loading branch information
sundbry committed Sep 22, 2024
commit 43b2a1cadaa126cdd1a157be583fff46a3af8d14
1 change: 1 addition & 0 deletions models/queue.go
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ type Queue interface {

// TODO: Peek should include an error on return
Peek(tenantId int64, queue string, messageId int64) *Message
// TODO: Status should include an error on return
Stats(tenantId int64, queue string) QueueStats
Filter(tenantId int64, queue string, filterCriteria FilterCriteria) []int64

36 changes: 34 additions & 2 deletions queue/pgmq/pgmq.go
Original file line number Diff line number Diff line change
@@ -81,7 +81,11 @@ func parseTenantQueueName(queueName string) (int64, string, error) {

// Returns the name of the underlying pqmq table
func buildTenantQueueTableName(tenantId int64, queueName string) string {
safeQueueName := base64.URLEncoding.EncodeToString([]byte(queueName))
// pgmq does not quote table names in its implementation,
// so table names are not case sensitive.
// It is possible for two table names to collide with the same
// case-insensitive base64 encoding.
safeQueueName := strings.ToLower(base64Encoder.EncodeToString([]byte(queueName)))
return fmt.Sprintf("pgmq.\"q_tnt_%x_%s\"", uint64(tenantId), safeQueueName)
}

@@ -226,7 +230,35 @@ func (q *PGMQQueue) Peek(tenantId int64, queue string, messageId int64) *models.
}

func (q *PGMQQueue) Stats(tenantId int64, queue string) models.QueueStats {
stats := models.QueueStats{}
stats := models.QueueStats{
Counts: make(map[models.MessageStatus]int),
TotalMessages: 0,
}

table := buildTenantQueueTableName(tenantId, queue)
sql := fmt.Sprintf(`
SELECT
CASE
WHEN read_ct > 0 AND CURRENT_TIMESTAMP < vt THEN 2
ELSE 1
END AS s, count(*) FROM %s GROUP BY s
`, table)
res := q.DB.Raw(sql)
rows, err := res.Rows()

if err != nil {
return stats
}

for rows.Next() {
var statusType models.MessageStatus
var count int
rows.Scan(&statusType, &count)
stats.TotalMessages += count
stats.Counts[statusType] = count
}
rows.Close()

return stats
}