forked from edgexfoundry/app-functions-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
64 lines (52 loc) · 2.64 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Copyright (C) 2024 IOTech Ltd
# SPDX-License-Identifier: Apache-2.0
"""
Messaging Initialization Module.
This module serves as the entry point for creating message clients, specifically focusing on MQTT
protocol-based communication. It provides a factory function to instantiate message clients based
on the provided configuration, supporting the integration with different types of message buses as
needed by the application.
The module abstracts the complexity of directly interacting with specific message bus
implementations, offering a simplified interface for client creation.
This allows for easy extension and integration of additional message bus types in the future.
Functions:
new_message_client(message_bus_config: MessageBusConfig) -> MessageClient: Factory function that
creates and returns a new instance of a message client based on the provided message bus
configuration.
Examples:
To create a new MQTT message client:
config = MessageBusConfig(type=MQTT, broker_info={'host': 'localhost', 'port': 1883})
mqtt_client = new_message_client(config)
Note:
Currently, this module supports only MQTT-based message clients. Future versions may include
support for other protocols.
See Also:
- `MqttMessageClient` in `mqtt.client` for the MQTT client implementation.
- `MessageBusConfig` in `interfaces.messaging` for the configuration structure.
"""
from .mqtt.client import MqttMessageClient
from .nats.client import NatsMessageClient
from ..contracts.clients.logger import Logger
from ..interfaces.messaging import MessageBusConfig, MessageClient, MQTT, HostInfo, NATS_CORE
from ..internal.common.config import MessageBusInfo
def new_message_client(message_bus_info: MessageBusInfo, logger: Logger) -> MessageClient:
"""
Factory function to create a new MessageClient based on the provided MessageBusInfo config.
Args:
message_bus_info: The MessageBusInfo configuration used to create the MessageClient.
Returns:
MessageClient: A new MessageClient instance.
"""
message_bus_config = MessageBusConfig(
broker_info=HostInfo(
protocol=message_bus_info.Protocol,
host=message_bus_info.Host,
port=message_bus_info.Port),
auth_mode=message_bus_info.AuthMode,
message_bus_type=message_bus_info.Type.lower(),
optional=message_bus_info.Optional)
if message_bus_config.type.lower() == MQTT:
return MqttMessageClient(message_bus_config)
if message_bus_config.type.lower() == NATS_CORE:
return NatsMessageClient(message_bus_config, logger)
raise ValueError(f"Unsupported message client type: {type}")