diff --git a/registrar/apps/core/consumer.py b/registrar/apps/core/consumer.py new file mode 100644 index 00000000..ae5a12b2 --- /dev/null +++ b/registrar/apps/core/consumer.py @@ -0,0 +1,43 @@ +""" +Defines the Kombu consumer for the registrar project. +""" +from __future__ import absolute_import + +from kombu.mixins import ConsumerMixin +from kombu import Exchange, Queue + +from registrar.apps.core.models import Organization + +task_exchange = Exchange('course_discovery', type='direct') +queues = [ + Queue('task_queue', task_exchange, routing_key='task_queue'), +] + +class Worker(ConsumerMixin): + + def __init__(self, connection): + self.connection = connection + + def get_consumers(self, Consumer, channel): + return [ + Consumer(queues, callbacks=[self.on_message], accept=['json']), + ] + + def on_message(self, body, message): + print(Organization.objects.first()) + print("If this prints, then we can access Django models!") + print('RECEIVED MESSAGE: {0!r}'.format(body)) + message.ack() + + +def run_consumer_worker(): + from kombu import Connection + from kombu.utils.debug import setup_logging + setup_logging(loglevel='DEBUG') + + with Connection('redis://:password@redis:6379/0') as conn: + try: + Worker(conn).run() + except KeyboardInterrupt: + print('bye bye') + diff --git a/registrar/apps/core/management/commands/run_consumer.py b/registrar/apps/core/management/commands/run_consumer.py new file mode 100644 index 00000000..00d53137 --- /dev/null +++ b/registrar/apps/core/management/commands/run_consumer.py @@ -0,0 +1,18 @@ +""" Management command to run worker that will act on messages """ +import logging + +from django.contrib.auth.models import Group +from django.core.management.base import BaseCommand, CommandError + +from registrar.apps.core.consumer import run_consumer_worker + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + # pylint: disable=missing-docstring + + help = 'Runs a worker to act on messages received from queue.' + + def handle(self, *args, **options): + run_consumer_worker() diff --git a/registrar/consumer.py b/registrar/consumer.py deleted file mode 100644 index b7f8c178..00000000 --- a/registrar/consumer.py +++ /dev/null @@ -1,39 +0,0 @@ - """ - Defines the Kombu consumer for the registrar project. - """ - from __future__ import absolute_import - - from kombu.mixins import ConsumerMixin - from kombu import Exchange, Queue - - task_exchange = Exchange('course_discovery', type='direct') - queues = [ - Queue('task_queue', task_exchange, routing_key='task_queue'), - ] - - class Worker(ConsumerMixin): - - def __init__(self, connection): - self.connection = connection - - def get_consumers(self, Consumer, channel): - return [ - Consumer(queues, callbacks=[self.on_message], accept=['json']), - ] - - def on_message(self, body, message): - print('RECEIVED MESSAGE: {0!r}'.format(body)) - message.ack() - - - if __name__ == '__main__': - from kombu import Connection - from kombu.utils.debug import setup_logging - setup_logging(loglevel='DEBUG') - - with Connection('redis://:password@redis:6379/0') as conn: - try: - Worker(conn).run() - except KeyboardInterrupt: - print('bye bye') -