Skip to content

Commit a57158a

Browse files
committed
pgmq: Implement Filter()
1 parent 2730c37 commit a57158a

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

queue/pgmq/pgmq.go

+30-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,36 @@ func (q *PGMQQueue) Stats(tenantId int64, queue string) models.QueueStats {
263263
}
264264

265265
func (q *PGMQQueue) Filter(tenantId int64, queue string, filterCriteria models.FilterCriteria) []int64 {
266-
return nil
266+
var messageIds []int64
267+
268+
tableName := buildTenantQueueTableName(tenantId, queue)
269+
270+
args := make([]any, 0)
271+
272+
whereConditions := make([]string, 0)
273+
274+
if filterCriteria.MessageID > 0 {
275+
whereConditions = append(whereConditions, "msg_id = ?")
276+
args = append(args, filterCriteria.MessageID)
277+
}
278+
279+
for k, v := range filterCriteria.KV {
280+
whereConditions = append(whereConditions, "message->'headers'->>? = ?")
281+
args = append(args, k, v)
282+
}
283+
284+
whereClause := ""
285+
if len(whereConditions) >0 {
286+
whereClause = "WHERE " + strings.Join(whereConditions, " AND ")
287+
}
288+
289+
sql := fmt.Sprintf("SELECT msg_id FROM %s %s LIMIT 10", tableName, whereClause)
290+
res := q.DB.Raw(sql, args...).Scan(&messageIds)
291+
if res.Error != nil {
292+
log.Error().Err(res.Error).Msg("Unable to filter")
293+
}
294+
295+
return messageIds
267296
}
268297

269298
func (q *PGMQQueue) Delete(tenantId int64, queue string, messageId int64) error {

0 commit comments

Comments
 (0)