Skip to content

Commit 9162efb

Browse files
committed
✨ passing tests and linter
1 parent 29612dc commit 9162efb

8 files changed

+264
-205
lines changed

client.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@ func (c *Client) Connect() error {
4444
}
4545

4646
c.connManager = connManager
47-
c.streamManager = NewStreamManager(c.connManager.JetStream(), c.logger)
47+
48+
js, err := c.connManager.JetStream()
49+
if err != nil {
50+
return err
51+
}
52+
53+
c.streamManager = NewStreamManager(js, c.logger)
4854
c.subManager = NewSubscriptionManager(c.Config.BatchSize)
4955
c.logSuccessfulConnection()
5056

@@ -95,7 +101,12 @@ func (c *Client) Publish(ctx context.Context, subject string, message []byte) er
95101

96102
// Subscribe subscribes to a topic and returns a single message.
97103
func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
98-
return c.subManager.Subscribe(ctx, topic, c.connManager.JetStream(), c.Config, c.logger, c.metrics)
104+
js, err := c.connManager.JetStream()
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
return c.subManager.Subscribe(ctx, topic, js, c.Config, c.logger, c.metrics)
99110
}
100111

101112
func (c *Client) generateConsumerName(subject string) string {
@@ -109,7 +120,11 @@ func (c *Client) SubscribeWithHandler(ctx context.Context, subject string, handl
109120
// Cancel any existing subscription for this subject
110121
c.cancelExistingSubscription(subject)
111122

112-
js := c.connManager.JetStream()
123+
js, err := c.connManager.JetStream()
124+
if err != nil {
125+
return err
126+
}
127+
113128
consumerName := c.generateConsumerName(subject)
114129

115130
cons, err := c.createOrUpdateConsumer(ctx, js, subject, consumerName)
@@ -241,3 +256,13 @@ func (c *Client) DeleteStream(ctx context.Context, name string) error {
241256
func (c *Client) CreateOrUpdateStream(ctx context.Context, cfg *jetstream.StreamConfig) (jetstream.Stream, error) {
242257
return c.streamManager.CreateOrUpdateStream(ctx, cfg)
243258
}
259+
260+
// GetJetStreamStatus returns the status of the JetStream connection.
261+
func GetJetStreamStatus(ctx context.Context, js jetstream.JetStream) string {
262+
_, err := js.AccountInfo(ctx)
263+
if err != nil {
264+
return jetStreamStatusError + ": " + err.Error()
265+
}
266+
267+
return jetStreamStatusOK
268+
}

0 commit comments

Comments
 (0)