Skip to content

Commit 29612dc

Browse files
committed
🔧
1 parent 0afcc23 commit 29612dc

5 files changed

+3
-19
lines changed

‎client.go

-13
Original file line numberDiff line numberDiff line change
@@ -95,19 +95,6 @@ func (c *Client) Publish(ctx context.Context, subject string, message []byte) er
9595

9696
// Subscribe subscribes to a topic and returns a single message.
9797
func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
98-
99-
if c.subManager == nil {
100-
return nil, errors.New("subscription manager is not initialized")
101-
}
102-
103-
if c.connManager == nil {
104-
return nil, errors.New("connection manager is not initialized")
105-
}
106-
107-
if c.connManager.JetStream() == nil {
108-
return nil, errors.New("jetstream is not initialized")
109-
}
110-
11198
return c.subManager.Subscribe(ctx, topic, c.connManager.JetStream(), c.Config, c.logger, c.metrics)
11299
}
113100

‎client_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package nats
22

33
import (
44
"context"
5-
"errors"
65
"sync"
76
"testing"
87
"time"
@@ -163,7 +162,7 @@ func TestNATSClient_SubscribeError(t *testing.T) {
163162
}
164163

165164
ctx := context.Background()
166-
expectedErr := errors.New("subscription error")
165+
expectedErr := errSubscriptionError
167166

168167
mockConnManager.EXPECT().JetStream().Return(mockJetStream).Times(2)
169168

‎connection_manager.go

-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ func NewConnectionManager(
5656
logger pubsub.Logger,
5757
natsConnector NATSConnector,
5858
jetStreamCreator JetStreamCreator) *ConnectionManager {
59-
6059
// if logger is nil panic
6160
if logger == nil {
6261
panic("logger is required")

‎errors.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ var (
99
errConsumerNotProvided = errors.New("consumer name not provided")
1010
errConsumerCreationError = errors.New("consumer creation error")
1111
errFailedToDeleteStream = errors.New("failed to delete stream")
12-
errFailedToCreateConsumer = errors.New("failed to create consumer")
1312
errPublishError = errors.New("publish error")
1413
errJetStreamNotConfigured = errors.New("JetStream is not configured")
1514
errJetStreamCreationFailed = errors.New("JetStream creation failed")
@@ -20,4 +19,5 @@ var (
2019
errCreateOrUpdateStream = errors.New("create or update stream error")
2120
errHandlerError = errors.New("handler error")
2221
errConnectionError = errors.New("connection error")
22+
errSubscriptionError = errors.New("subscription error")
2323
)

‎pubsub_wrapper.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package nats
22

33
import (
44
"context"
5-
"fmt"
65

76
"gofr.dev/pkg/gofr/datasource"
87
"gofr.dev/pkg/gofr/datasource/pubsub"
@@ -46,9 +45,9 @@ func (w *PubSubWrapper) Health() datasource.Health {
4645

4746
// Connect establishes a connection to NATS.
4847
func (w *PubSubWrapper) Connect() {
49-
fmt.Println("Connecting to NATS using PubSubWrapper")
5048
if w.Client.connManager != nil && w.Client.connManager.Health().Status == datasource.StatusUp {
5149
w.Client.logger.Log("NATS connection already established")
50+
5251
return
5352
}
5453

0 commit comments

Comments
 (0)