Skip to content

Commit

Permalink
Fixed MQTT examples + some doc
Browse files Browse the repository at this point in the history
  • Loading branch information
OscarFdezS committed Jun 27, 2024
1 parent 95838c1 commit b139a1f
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 46 deletions.
11 changes: 5 additions & 6 deletions xdevs/abc/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@


class Connector:
def __init__(self, conections: dict[str, str]):
def __init__(self, connections: dict[str, str]):
"""
Función para conectar de forma correcta los puertos (que usen protocolo MQTT)
Function to connect ports correctly (using MQTT protocol)
:param conections: dict[key: str, value: str]. Donde la key es el puerto de al que me quiero conectar y el
value es el puerto de mi acoplado.
:param connections: dict[key: str, value: str]. Where the key is the port I am connecting to (via MQTT) and the value is the port of my coupled.
"""
self.connections: dict[str, str] = conections
self.connections: dict[str, str] = connections

def input_handler(self, port: str):
if self.connections is not None:
Expand Down Expand Up @@ -42,7 +41,7 @@ def __init__(self, *args, **kwargs):
self.msg_parsers: dict[str, Callable[[str], Any]] = kwargs.get('msg_parsers', dict())

self.connections: dict[str, str] = kwargs.get('connections', dict())
self.connector = Connector(conections=self.connections)
self.connector = Connector(connections=self.connections)

def initialize(self):
"""Performs any task before calling the run method. It is implementation-specific. By default, it is empty."""
Expand Down
14 changes: 8 additions & 6 deletions xdevs/examples/store/4_1_rt_simulation_mqtt_input_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,23 @@ def mqtt_parser(msg: str):
print(f"\tNumber of Employees: {n_employees}")
print(f"\tMean time required to dispatch clients: {mean_employees} seconds (stddev of {stddev_employees})")

conexiones = {
'Gen_ClientOut': 'Queue_ClientGen'
# Map of the port of I am subscribing to and the port of the model
connections = {
'Gen_ClientOut': 'i_ExternalGen'
}
topics = {'RTsys/Output/Gen_ClientOut': 0}

# Topics I am subscribing to
topics = {'RTsys/output/Gen_ClientOut': 0}
# Parser of the port of my model to the desired Port Type
msg_parser = {
'Queue_ClientGen': mqtt_parser,
'i_ExternalGen': mqtt_parser,
}

start = time.time()
storeNOGEN = StoreWithoutGen(n_employees, mean_employees, stddev_employees)
middle = time.time()
print(f"Model Created. Elapsed time: {middle - start} sec")
rt_manager = RealTimeManager(max_jitter=0.2, event_window=0.5)
rt_manager.add_input_handler('mqtt', subscriptions=topics, connections=conexiones, msg_parsers=msg_parser)
rt_manager.add_input_handler('mqtt', subscriptions=topics, connections=connections, msg_parsers=msg_parser)
c = RealTimeCoordinator(storeNOGEN, rt_manager)
middle = time.time()
print(f"Coordinator and Manager Created. Elapsed time: {middle - start} sec")
Expand Down
6 changes: 3 additions & 3 deletions xdevs/examples/store/4_2_rt_simulation_mqtt_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ def get_sec(time_str):
stddev_employees))

start = time.time()
gensys = GenSys(mean_generator, stddev_clients)
gens = GenSys(mean_generator, stddev_clients)
middle = time.time()
print("Model Created. Elapsed time: {} sec".format(middle - start))
rt_manager = RealTimeManager(max_jitter=0.2, event_window=0.5)
rt_manager.add_output_handler('mqtt_handler')
c = RealTimeCoordinator(gensys, rt_manager)
rt_manager.add_output_handler('mqtt')
c = RealTimeCoordinator(gens, rt_manager)
middle = time.time()
print("Coordinator and Manager Created. Elapsed time: {} sec".format(middle - start))
c.simulate_rt(time_interv=sim_time)
Expand Down
6 changes: 3 additions & 3 deletions xdevs/examples/store/models/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ def __init__(self, n_employees: int = 10000, mean_employees: float = 30, stddev_
self.o_p_queue = Port(ClientToEmployee)
self.add_out_port(self.o_p_queue)

self.i_port_gen = Port(NewClient)
self.add_in_port(self.i_port_gen)
self.i_ExternalGen = Port(NewClient, 'i_ExternalGen')
self.add_in_port(self.i_ExternalGen)

self.add_component(queue)

self.add_coupling(self.i_port_gen, queue.input_new_client)
self.add_coupling(self.i_ExternalGen, queue.input_new_client)

self.add_coupling(queue.output_client_to_employee, self.o_p_queue)

Expand Down
36 changes: 9 additions & 27 deletions xdevs/plugins/input_handlers/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
from paho.mqtt.client import Client
from xdevs.abc.handler import InputHandler

# Desde este input handler me subscribo a topics para ver los mensajes que entran
# ruta: RTsys/coupled_name/input/port_name y to_do lo que llegue a ese puerto se inyecta.


#########################################################################
#########################################################################
#########################################################################
def on_connect(client, userdata, flags, rc):
print(f'MQTT client connected with mqtt: {rc}') # rc valor de exito o fracaso en la conexion
print(f'MQTT client connected with mqtt: {rc}') # rc value for success or failure
return rc

def on_message(client, userdata, msg):
Expand All @@ -31,11 +26,6 @@ def __init__(self, event_queue: queue = None, **kwargs):

self.event_queue = event_queue


#########################################################################
#########################################################################
#########################################################################

def mqtt_parser(mqtt_msg):
topic = [item for item in mqtt_msg.topic.split('/')]
port = topic[-1]
Expand All @@ -46,9 +36,14 @@ def mqtt_parser(mqtt_msg):
class MQTTInputHandler(InputHandler):
def __init__(self, subscriptions: dict[str, int] = None, **kwargs):
"""
:param subscriptions: diccionario con los topics y su qos
:param kwargs:
This input handler is the implementation of the MQTT protocol.
It subscribes to the desired topics and pushes the messages received to the system
:param dict[str, int] subscriptions: dict of topics and their QoS. Default is None
:param str host: desired MQTT broker. Default is 'test.mosquitto.org'
:param int port: port of the MQTT broker to be used. Default is 1883
:param int keepalive: keepalive time for the MQTT connection. Default is 60
:param Callable[[mqtt.Message], str, str] event_parser: from the received message obtain the topic and the message payload. Default is mqtt_parser
"""

kwargs['event_parser'] = kwargs.get('event_parser', mqtt_parser)
Expand Down Expand Up @@ -78,19 +73,6 @@ def run(self):
print(f'MQTT: Event pushed') # {event} t = {datetime.datetime.now()}')
self.push_event(event)

if __name__ == '__main__':
input_queue = queue.SimpleQueue()
event_Q = queue.SimpleQueue()

sub: dict = {
'ALSW/#': 0,
'ALSW/TEP': 0,
'RTsys/#': 0,
}
# C = MQTTClient(event_queue=event_Q)
IN = MQTTInputHandler(queue=input_queue, subscriptions=sub)
IN.initialize()
IN.run()

except ImportError:
from .bad_dependencies import BadDependenciesHandler
Expand Down
13 changes: 12 additions & 1 deletion xdevs/plugins/output_handlers/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@

try:
from xdevs.abc.handler import OutputHandler
from ..input_handlers import MQTTClient
from ..input_handlers.mqtt import MQTTClient


class MQTTOutputHandler(OutputHandler):
"""
This output handler is the implementation of the MQTT protocol.
It publishes events to the desired topics.
:param str host: desired MQTT broker. Default is 'test.mosquitto.org'
:param int port: port of the MQTT broker to be used. Default is 1883
:param int keepalive: keepalive time for the MQTT connection. Default is 60
:param str topic: desired topic to publish the events. Default is 'RTsys' and generate the topic as 'RTsys/output/<port>'
:param Callable[[str, str], Tuple[str,str]] event_parser: function to obtain the topic and message payload to be published.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)

Expand All @@ -27,6 +37,7 @@ def initialize(self):
def run(self):
while True:
topic, payload = self.pop_event()
print(f'Publishing {payload} to {topic}')
self.client.publish(topic, payload)


Expand Down

0 comments on commit b139a1f

Please sign in to comment.