@@ -2,12 +2,19 @@ package pgmq
2
2
3
3
import (
4
4
"context"
5
+ "encoding/json"
6
+ "fmt"
5
7
"github.com/craigpastro/pgmq-go"
6
8
"github.com/poundifdef/smoothmq/config"
7
9
"github.com/poundifdef/smoothmq/models"
8
10
"github.com/rs/zerolog/log"
9
11
)
10
12
13
+ type Envelope struct {
14
+ Body string `json:"body"`
15
+ Headers map [string ]string `json:"headers"`
16
+ }
17
+
11
18
type PGMQQueue struct {
12
19
PGMQ * pgmq.PGMQ
13
20
}
@@ -24,33 +31,94 @@ func NewPGMQQueue(cfg config.PGMQConfig) (*PGMQQueue, error) {
24
31
return driver , nil
25
32
}
26
33
34
+ func buildTenantQueueName (tenantId int64 , queueName string ) string {
35
+ return fmt .Sprintf ("q_%x_%s" , uint64 (tenantId ), queueName )
36
+ }
37
+
38
+ func toMessage (tenantId int64 , in * pgmq.Message ) (* models.Message , error ) {
39
+ var envelope Envelope
40
+ err := json .Unmarshal (in .Message , & envelope )
41
+ if err != nil {
42
+ return nil , err
43
+ }
44
+
45
+ return & models.Message {
46
+ ID : in .MsgID ,
47
+ TenantID : tenantId ,
48
+ //QueueID: message.QueueID,
49
+ //DeliverAt: int(message.DeliverAt),
50
+ //DeliveredAt: int(message.DeliveredAt),
51
+ //Tries: message.Tries,
52
+ //MaxTries: message.MaxTries,
53
+ Message : []byte (envelope .Body ),
54
+ KeyValues : envelope .Headers ,
55
+ }, nil
56
+ }
57
+
27
58
func (q * PGMQQueue ) GetQueue (tenantId int64 , queueName string ) (models.QueueProperties , error ) {
28
59
queue := models.QueueProperties {}
29
60
return queue , nil
30
61
}
31
62
32
63
func (q * PGMQQueue ) CreateQueue (tenantId int64 , properties models.QueueProperties ) error {
33
- return nil
64
+ queueName := buildTenantQueueName (tenantId , properties .Name )
65
+ err := q .PGMQ .CreateQueue (context .TODO (), queueName )
66
+ return err
34
67
}
35
68
36
69
func (q * PGMQQueue ) UpdateQueue (tenantId int64 , queue string , properties models.QueueProperties ) error {
37
70
return nil
38
71
}
39
72
40
73
func (q * PGMQQueue ) DeleteQueue (tenantId int64 , queue string ) error {
41
- return nil
74
+ queueName := buildTenantQueueName (tenantId , queue )
75
+ err := q .PGMQ .DropQueue (context .TODO (), queueName )
76
+ return err
42
77
}
43
78
44
79
func (q * PGMQQueue ) ListQueues (tenantId int64 ) ([]string , error ) {
45
80
return nil , nil
46
81
}
47
82
48
83
func (q * PGMQQueue ) Enqueue (tenantId int64 , queue string , message string , kv map [string ]string , delay int ) (int64 , error ) {
49
- return 0 , nil
84
+ queueName := buildTenantQueueName (tenantId , queue )
85
+ envelope := Envelope {
86
+ Body : message ,
87
+ Headers : kv ,
88
+ }
89
+ rawMsg , err := json .Marshal (envelope )
90
+ if err != nil {
91
+ return 0 , err
92
+ }
93
+ msgId , err := q .PGMQ .Send (context .TODO (), queueName , rawMsg )
94
+ return msgId , err
50
95
}
51
96
52
97
func (q * PGMQQueue ) Dequeue (tenantId int64 , queue string , numToDequeue int , requeueIn int ) ([]* models.Message , error ) {
53
- return nil , nil
98
+ queueName := buildTenantQueueName (tenantId , queue )
99
+ var visibilityTimeoutSeconds int64
100
+ visibilityTimeoutSeconds = 0 // Use default
101
+
102
+ if requeueIn > 0 {
103
+ visibilityTimeoutSeconds = int64 (requeueIn )
104
+ }
105
+
106
+ msgs , err := q .PGMQ .ReadBatch (context .TODO (), queueName , visibilityTimeoutSeconds , int64 (numToDequeue ))
107
+
108
+ if err != nil {
109
+ return nil , err
110
+ }
111
+
112
+ out := make ([]* models.Message , len (msgs ))
113
+
114
+ for i , msg := range msgs {
115
+ msg2 , err := toMessage (tenantId , msg )
116
+ if err != nil {
117
+ return nil , err
118
+ }
119
+ out [i ] = msg2
120
+ }
121
+ return out , nil
54
122
}
55
123
56
124
func (q * PGMQQueue ) Peek (tenantId int64 , queue string , messageId int64 ) * models.Message {
0 commit comments