From 52bc3c1b4f8c0eb4624ce49a0e0b746aedaf8c53 Mon Sep 17 00:00:00 2001 From: Mathieu Durand <1435391+matdurand@users.noreply.github.com> Date: Fri, 8 Sep 2023 14:09:44 -0400 Subject: [PATCH] Adding a configuration option for topic-subscription validation. --- pkg/googlecloud/pubsub_test.go | 36 ++++++++++++++++++++++++++++++++++ pkg/googlecloud/subscriber.go | 13 ++++++++++-- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/pkg/googlecloud/pubsub_test.go b/pkg/googlecloud/pubsub_test.go index ccc7c31..c5b0e97 100644 --- a/pkg/googlecloud/pubsub_test.go +++ b/pkg/googlecloud/pubsub_test.go @@ -121,6 +121,42 @@ func TestPublishSubscribeOrdering(t *testing.T) { ) } +func TestSubscriberAllowedWhenAttachedToAnotherTopic(t *testing.T) { + rand.Seed(time.Now().Unix()) + testNumber := rand.Int() + logger := watermill.NewStdLogger(true, true) + + subNameFn := func(topic string) string { + return fmt.Sprintf("sub_%d", testNumber) + } + + sub1, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{ + GenerateSubscriptionName: subNameFn, + }, logger) + require.NoError(t, err) + + topic1 := fmt.Sprintf("topic1_%d", testNumber) + + sub2, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{ + GenerateSubscriptionName: subNameFn, + DoNotEnforceSubscriptionAttachedToTopic: true, + }, logger) + require.NoError(t, err) + topic2 := fmt.Sprintf("topic2_%d", testNumber) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err = sub1.Subscribe(ctx, topic1) + require.NoError(t, err) + + // without the DoNotEnforceSubscriptionAttachedToTopic, this call would fail because subNameFn will return the + // same value for both sub1 and sub2, and sub1 will create a subscription and topic attached to each other + // making sub2.Subscribe fail because the requested subscription (topic2) is not attached to the GCP topic + _, err = sub2.Subscribe(ctx, topic2) + require.NoError(t, err) +} + func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) { rand.Seed(time.Now().Unix()) testNumber := rand.Int() diff --git a/pkg/googlecloud/subscriber.go b/pkg/googlecloud/subscriber.go index 1f7c225..f5f8c53 100644 --- a/pkg/googlecloud/subscriber.go +++ b/pkg/googlecloud/subscriber.go @@ -75,6 +75,11 @@ type SubscriberConfig struct { // Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`. DoNotCreateTopicIfMissing bool + // If false (default), when the subscription already exists, `Subscriber` will make sure that the subscription is + // attached to the provided topic, and will return a ErrUnexpectedTopic if not. + // Otherwise, it won't check to which topic the subscription is attached to. + DoNotEnforceSubscriptionAttachedToTopic bool + // deprecated: ConnectTimeout is no longer used, please use timeout on context in Subscribe() method ConnectTimeout time.Duration @@ -437,6 +442,12 @@ func (s *Subscriber) existingSubscription(ctx context.Context, sub *pubsub.Subsc return nil, errors.Wrap(err, "could not fetch config for existing subscription") } + sub.ReceiveSettings = s.config.ReceiveSettings + + if s.config.DoNotEnforceSubscriptionAttachedToTopic { + return sub, nil + } + fullyQualifiedTopicName := fmt.Sprintf("projects/%s/topics/%s", s.config.topicProjectID(), topic) if config.Topic.String() != fullyQualifiedTopicName { @@ -446,8 +457,6 @@ func (s *Subscriber) existingSubscription(ctx context.Context, sub *pubsub.Subsc ) } - sub.ReceiveSettings = s.config.ReceiveSettings - return sub, nil }