Skip to content

feat(modules): add Azure Service Bus Emulator module #793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/azure/testcontainers/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. autoclass:: testcontainers.azure.ServiceBusContainer
.. title:: testcontainers.azure.ServiceBusContainer
125 changes: 125 additions & 0 deletions modules/azure/testcontainers/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
import logging
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

if TYPE_CHECKING:
from typing_extensions import Self

from testcontainers.core.network import Network
from testcontainers.mssql import SqlServerContainer

from ._types import ServiceBusConfiguration

log = logging.getLogger(__name__)


class ServiceBusContainer(DockerContainer):
"""
CockroachDB database container.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice Typo


Example:

The example will spin up a ServiceBus broker to which you can connect with the credentials
passed in the constructor. Alternatively, you may use the :code:`get_connection_url()`
method which returns a ServiceBusClient compatible url.

.. doctest::

>>> from azure.servicebus import ServiceBusClient
>>> from testcontainers.azure import ServiceBusContainer
>>> from testcontainers.mssql import SqlServerContainer
>>> from testcontainers.core.network import Network

>>> CONFIG = {"UserConfig":{"Namespaces":[{"Name":"namespace","Queues":[],"Topics":[
... {"Name":"topic","Properties":{},"Subscriptions":[]}]}],"Logging":{"Type":"File"}}}
>>> network = Network().create()
>>> with SqlServerContainer().with_network(network) as mssql_container:
... with ServiceBusContainer(
... config=CONFIG, network=network, mssql_container=mssql_container
... ) as sb_container:
... client = ServiceBusClient.from_connection_string(
... sb_container.get_connection_url(), logging_enable=True
... )
... with client.get_topic_sender(topic_name="topic") as sender:
... sender.send_messages(ServiceBusMessage("test"))

"""

CONNECTION_STRING_FORMAT = (
"Endpoint=sb://{}:{};SharedAccessKeyName=RootManageSharedAccessKey;"
"SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
)

def __init__(
self,
*,
config: ServiceBusConfiguration,
network: Network,
mssql_container: SqlServerContainer,
port: int = 5672,
image: str = "mcr.microsoft.com/azure-messaging/servicebus-emulator:latest",
mssql_container_wait: int = 0,
):
self.config = config
self.network = network
self.port = port
self.image = image

self._mssql_container = mssql_container
self._mssql_container_wait = mssql_container_wait

# Create a temporary config file for the emulator
self._configdir = tempfile.TemporaryDirectory()
log.debug("Using %r as a configuration directory.", self._configdir.name)
with open(str(Path(self._configdir.name) / "Config.json"), "w") as f:
f.write(json.dumps(config))

# Configure.
super().__init__(self.image)
self.with_volume_mapping(self._configdir.name, "/ServiceBus_Emulator/ConfigFiles", mode="ro").with_env(
"SQL_SERVER", "sql"
).with_env("MSSQL_SA_PASSWORD", mssql_container.password).with_network_aliases("servicebus").with_exposed_ports(
self.port
).with_network(network).with_env("SQL_WAIT_INTERVAL", str(self._mssql_container_wait))

def accept_license(self) -> Self:
"""Accept the Microsoft EULA for the service bus emulator.

Required for startup to succeed."""
self.with_env("ACCEPT_EULA", "Y")
return self

def _connect(self) -> None:
wait_for_logs(self, "Emulator Service is Successfully Up!", timeout=15)

def start(self) -> Self:
# Make the sql container accessible from ServiceBus.
self._mssql_container.with_network_aliases("sql").start()
super().start()
self._connect()
return self

def get_connection_url(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)

return self.CONNECTION_STRING_FORMAT.format(host, port)
108 changes: 108 additions & 0 deletions modules/azure/testcontainers/azure/_types.py
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I'm not too convinced on this, I think maybe using dataclasses.dataclass might be better? They offer a JSON schema schema here that could be used as a source.

Open to thoughts.

Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from typing import Literal, Union

if sys.version_info < (3, 11):
from typing_extensions import NotRequired, TypedDict
else:
from typing import NotRequired, TypedDict


class QueueProperty(TypedDict):
DeadLetteringOnMessageExpiration: NotRequired[bool]
DefaultMessageTimeToLive: NotRequired[str]
DuplicateDetectionHistoryTimeWindow: NotRequired[str]
ForwardDeadLetteredMessagesTo: NotRequired[str]
ForwardTo: NotRequired[str]
LockDuration: NotRequired[str]
MaxDeliveryCount: NotRequired[int]
RequiresDuplicateDetection: NotRequired[bool]
RequiresSession: NotRequired[bool]


class Queue(TypedDict):
Name: str
Properties: NotRequired[QueueProperty]


class CorrelationFilter(TypedDict):
ContentType: NotRequired[str]
CorrelationId: NotRequired[str]
Label: NotRequired[str]
MessageId: NotRequired[str]
ReplyTo: NotRequired[str]
ReplyToSessionId: NotRequired[str]
SessionId: NotRequired[str]
To: NotRequired[str]


class CorrelationRule(TypedDict):
FilterType: Literal["Correlation"]
CorrelationFilter: CorrelationFilter


class SQLFilter(TypedDict):
SqlExpression: str


class SQLAction(TypedDict):
SqlExpression: str


class SQLRule(TypedDict):
FilterType: Literal["Sql"]
SqlFilter: SQLFilter
Action: SQLAction


class Rule(TypedDict):
Name: str
Properties: Union[CorrelationRule, SQLRule]


class SubscriptionProperty(TypedDict):
DeadLetteringOnMessageExpiration: NotRequired[bool]
DefaultMessageTimeToLive: NotRequired[str]
LockDuration: NotRequired[str]
MaxDeliveryCount: NotRequired[int]
ForwardDeadLetteredMessagesTo: NotRequired[str]
ForwardTo: NotRequired[str]
RequiresSession: NotRequired[bool]


class Subscription(TypedDict):
Name: str
Properties: NotRequired[SubscriptionProperty]
Rules: list[Rule]


class Topic(TypedDict):
Name: str
Properties: dict[str, Union[str, bool]]
Subscriptions: list[Subscription]


class Namespace(TypedDict):
Name: str
Queues: list[Queue]
Topics: list[Topic]


class UserConfig(TypedDict):
Namespaces: list[Namespace]
Logging: dict[str, str]


class ServiceBusConfiguration(TypedDict):
UserConfig: str
Empty file.
87 changes: 87 additions & 0 deletions modules/azure/testcontainers/tests/test_servicebus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
from typing import cast

from _pytest.logging import LogCaptureFixture
from azure.servicebus import ServiceBusClient, ServiceBusMessage

from testcontainers.core.network import Network
from testcontainers.azure import ServiceBusContainer
from testcontainers.mssql import SqlServerContainer

from ._types import ServiceBusConfiguration

log = logging.getLogger(__name__)

# Default namespace.
NAMESPACE = "sbemulatorns"
TOPIC = "demo-topic"
SUBSCRIPTION = "demo-subscription"

CONFIG = {
"UserConfig": {
"Namespaces": [
{
"Name": NAMESPACE,
"Queues": [],
"Topics": [
{
"Name": TOPIC,
"Properties": {
"DefaultMessageTimeToLive": "PT1H",
"DuplicateDetectionHistoryTimeWindow": "PT20S",
"RequiresDuplicateDetection": False,
},
"Subscriptions": [
{
"Name": SUBSCRIPTION,
"Properties": {
"DeadLetteringOnMessageExpiration": False,
"DefaultMessageTimeToLive": "PT1H",
"LockDuration": "PT1M",
"MaxDeliveryCount": 10,
"ForwardDeadLetteredMessagesTo": "",
"ForwardTo": "",
"RequiresSession": False,
},
"Rules": [],
},
],
}
],
}
],
"Logging": {"Type": "File"},
}
}


def test_service_bus_integration(caplog: LogCaptureFixture) -> None:
caplog.set_level(logging.INFO)

network = Network().create()

mssql_container = SqlServerContainer(
"mcr.microsoft.com/mssql/server:2022-CU12-ubuntu-22.04", password="d0gs>cats!!!"
)
mssql_container.with_env("ACCEPT_EULA", "Y")
mssql_container.with_network(network)
mssql_container.with_exposed_ports(1433)

with ServiceBusContainer(
config=cast(ServiceBusConfiguration, CONFIG), network=network, mssql_container=mssql_container
).accept_license() as sbemulator:
conn_str = sbemulator.get_connection_url()
log.debug("Using %r as endpoint", conn_str)

test_message = "potato"

client = ServiceBusClient.from_connection_string(conn_str, logging_enable=True)
with client.get_topic_sender(topic_name=TOPIC) as sender:
sender.send_messages(ServiceBusMessage(test_message))

handler_client = ServiceBusClient.from_connection_string(conn_str, logging_enable=True)

with handler_client.get_subscription_receiver(subscription_name=SUBSCRIPTION, topic_name=TOPIC) as receiver:
received = receiver.receive_messages(max_message_count=1, max_wait_time=5)
message = next(received[0].body).decode()
assert message == test_message
Loading