Skip to content

Commit bee6d92

Browse files
committed
pgmq: Implement Enqueue and Dequeue interfaces
1 parent 5e9cc12 commit bee6d92

File tree

1 file changed

+72
-4
lines changed

1 file changed

+72
-4
lines changed

queue/pgmq/pgmq.go

+72-4
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@ package pgmq
22

33
import (
44
"context"
5+
"encoding/json"
6+
"fmt"
57
"github.com/craigpastro/pgmq-go"
68
"github.com/poundifdef/smoothmq/config"
79
"github.com/poundifdef/smoothmq/models"
810
"github.com/rs/zerolog/log"
911
)
1012

13+
type Envelope struct {
14+
Body string `json:"body"`
15+
Headers map[string]string `json:"headers"`
16+
}
17+
1118
type PGMQQueue struct {
1219
PGMQ *pgmq.PGMQ
1320
}
@@ -24,33 +31,94 @@ func NewPGMQQueue(cfg config.PGMQConfig) (*PGMQQueue, error) {
2431
return driver, nil
2532
}
2633

34+
func buildTenantQueueName(tenantId int64, queueName string) string {
35+
return fmt.Sprintf("q_%x_%s", uint64(tenantId), queueName)
36+
}
37+
38+
func toMessage(tenantId int64, in *pgmq.Message) (*models.Message, error) {
39+
var envelope Envelope
40+
err := json.Unmarshal(in.Message, &envelope)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
return &models.Message {
46+
ID: in.MsgID,
47+
TenantID: tenantId,
48+
//QueueID: message.QueueID,
49+
//DeliverAt: int(message.DeliverAt),
50+
//DeliveredAt: int(message.DeliveredAt),
51+
//Tries: message.Tries,
52+
//MaxTries: message.MaxTries,
53+
Message: []byte(envelope.Body),
54+
KeyValues: envelope.Headers,
55+
}, nil
56+
}
57+
2758
func (q *PGMQQueue) GetQueue(tenantId int64, queueName string) (models.QueueProperties, error) {
2859
queue := models.QueueProperties{}
2960
return queue, nil
3061
}
3162

3263
func (q *PGMQQueue) CreateQueue(tenantId int64, properties models.QueueProperties) error {
33-
return nil
64+
queueName := buildTenantQueueName(tenantId, properties.Name)
65+
err := q.PGMQ.CreateQueue(context.TODO(), queueName)
66+
return err
3467
}
3568

3669
func (q *PGMQQueue) UpdateQueue(tenantId int64, queue string, properties models.QueueProperties) error {
3770
return nil
3871
}
3972

4073
func (q *PGMQQueue) DeleteQueue(tenantId int64, queue string) error {
41-
return nil
74+
queueName := buildTenantQueueName(tenantId, queue)
75+
err := q.PGMQ.DropQueue(context.TODO(), queueName)
76+
return err
4277
}
4378

4479
func (q *PGMQQueue) ListQueues(tenantId int64) ([]string, error) {
4580
return nil, nil
4681
}
4782

4883
func (q *PGMQQueue) Enqueue(tenantId int64, queue string, message string, kv map[string]string, delay int) (int64, error) {
49-
return 0, nil
84+
queueName := buildTenantQueueName(tenantId, queue)
85+
envelope := Envelope{
86+
Body: message,
87+
Headers: kv,
88+
}
89+
rawMsg, err := json.Marshal(envelope)
90+
if err != nil {
91+
return 0, err
92+
}
93+
msgId, err := q.PGMQ.Send(context.TODO(), queueName, rawMsg)
94+
return msgId, err
5095
}
5196

5297
func (q *PGMQQueue) Dequeue(tenantId int64, queue string, numToDequeue int, requeueIn int) ([]*models.Message, error) {
53-
return nil, nil
98+
queueName := buildTenantQueueName(tenantId, queue)
99+
var visibilityTimeoutSeconds int64
100+
visibilityTimeoutSeconds = 0 // Use default
101+
102+
if requeueIn > 0 {
103+
visibilityTimeoutSeconds = int64(requeueIn)
104+
}
105+
106+
msgs, err := q.PGMQ.ReadBatch(context.TODO(), queueName, visibilityTimeoutSeconds, int64(numToDequeue))
107+
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
out := make([]*models.Message, len(msgs))
113+
114+
for i, msg := range msgs {
115+
msg2, err := toMessage(tenantId, msg)
116+
if err != nil {
117+
return nil, err
118+
}
119+
out[i] = msg2
120+
}
121+
return out, nil
54122
}
55123

56124
func (q *PGMQQueue) Peek(tenantId int64, queue string, messageId int64) *models.Message {

0 commit comments

Comments
 (0)