Skip to content

Commit

Permalink
Merge pull request #2681 from chaoss/add-api-start-command
Browse files Browse the repository at this point in the history
Add `augur api start` and `augur collection start` commands
  • Loading branch information
ABrain7710 authored Feb 27, 2024
2 parents eec445b + 61dc81f commit 7fc61c5
Show file tree
Hide file tree
Showing 3 changed files with 527 additions and 0 deletions.
69 changes: 69 additions & 0 deletions augur/application/cli/_cli_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import resource
import os
import subprocess
import psutil
import signal
from urllib.parse import urlparse

from augur.tasks.init.redis_connection import redis_connection

def clear_redis_caches(logger):
"""Clears the redis databases that celery and redis use."""

logger.info("Flushing all redis databases this instance was using")
celery_purge_command = "celery -A augur.tasks.init.celery_app.celery_app purge -f"
subprocess.call(celery_purge_command.split(" "))
redis_connection.flushdb()


def clear_rabbitmq_messages(connection_string, queues, logger):
#virtual_host_string = connection_string.split("/")[-1]

logger.info("Clearing all messages from celery queue in rabbitmq")
from augur.tasks.init.celery_app import celery_app
celery_app.control.purge()

clear_message_queues(connection_string, queues)


def clear_message_queues(connection_string, queues):
queues = ['celery','secondary','scheduling','facade']

virtual_host_string = connection_string.split("/")[-1]

#Parse username and password with urllib
parsed = urlparse(connection_string)

for q in queues:
curl_cmd = f"curl -i -u {parsed.username}:{parsed.password} -XDELETE http://localhost:15672/api/queues/{virtual_host_string}/{q}"
subprocess.call(curl_cmd.split(" "),stdout=subprocess.PIPE, stderr=subprocess.PIPE)


def _broadcast_signal_to_processes(processes, logger, broadcast_signal=signal.SIGTERM):

for process in processes:
if process.pid != os.getpid():
logger.info(f"Stopping process {process.pid}")
try:
process.send_signal(broadcast_signal)
except psutil.NoSuchProcess:
pass


def raise_open_file_limit(num_files):
"""
sets number of open files soft limit
"""
current_soft, current_hard = resource.getrlimit(resource.RLIMIT_NOFILE)

# if soft is already greater than the requested amount then don't change it
if current_soft > num_files:
return

# if the requested amount is greater than the hard limit then set the hard limit to the num_files value
if current_hard <= num_files:
current_hard = num_files

resource.setrlimit(resource.RLIMIT_NOFILE, (num_files, current_hard))

return
157 changes: 157 additions & 0 deletions augur/application/cli/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#SPDX-License-Identifier: MIT
"""
Augur library commands for controlling the backend components
"""
import os
import time
import subprocess
import click
import logging
import psutil
import signal
import uuid
import traceback

from augur.application.db.session import DatabaseSession
from augur.application.logs import AugurLogger
from augur.application.config import AugurConfig
from augur.application.cli import test_connection, test_db_connection
from augur.application.cli._cli_util import _broadcast_signal_to_processes, raise_open_file_limit, clear_redis_caches, clear_rabbitmq_messages

logger = AugurLogger("augur", reset_logfiles=True).get_logger()

@click.group('api', short_help='Commands for controlling the backend API server')
def cli():
pass

@cli.command("start")
@click.option("--development", is_flag=True, default=False, help="Enable development mode")
@click.option('--port')
@test_connection
@test_db_connection
def start(development, port):
"""Start Augur's backend server."""

try:
if os.environ.get('AUGUR_DOCKER_DEPLOY') != "1":
raise_open_file_limit(100000)
except Exception as e:
logger.error(
''.join(traceback.format_exception(None, e, e.__traceback__)))

logger.error("Failed to raise open file limit!")
raise e

if development:
os.environ["AUGUR_DEV"] = "1"
logger.info("Starting in development mode")

try:
gunicorn_location = os.getcwd() + "/augur/api/gunicorn_conf.py"
except FileNotFoundError:
logger.error("\n\nPlease run augur commands in the root directory\n\n")

with DatabaseSession(logger) as db_session:
config = AugurConfig(logger, db_session)
host = config.get_value("Server", "host")

if not port:
port = config.get_value("Server", "port")

gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app --log-file gunicorn.log"
server = subprocess.Popen(gunicorn_command.split(" "))

time.sleep(3)
logger.info('Gunicorn webserver started...')
logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}')

frontend_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n frontend:{uuid.uuid4().hex}@%h -Q frontend"
frontend_worker_process = subprocess.Popen(frontend_worker.split(" "))

try:
server.wait()
except KeyboardInterrupt:

if server:
logger.info("Shutting down server")
server.terminate()

logger.info("Shutting down frontend celery worker process")
if frontend_worker_process:
frontend_worker_process.terminate()

@cli.command('stop')
def stop():
"""
Sends SIGTERM to all Augur api processes
"""
logger = logging.getLogger("augur.cli")

augur_stop(signal.SIGTERM, logger)

@cli.command('kill')
def kill():
"""
Sends SIGKILL to all Augur api processes
"""
logger = logging.getLogger("augur.cli")
augur_stop(signal.SIGKILL, logger)

@cli.command('processes')
def processes():
"""
Outputs the name/PID of all Augur api process"""
augur_processes = get_augur_api_processes()
for process in augur_processes:
logger.info(f"Found process {process.pid}")

def augur_stop(signal, logger):
"""
Stops augur with the given signal,
and cleans up the api
"""

augur_processes = get_augur_api_processes()

_broadcast_signal_to_processes(augur_processes, logger=logger, broadcast_signal=signal)

cleanup_after_api_halt(logger)


def cleanup_after_api_halt(logger):

connection_string = ""
queues = ['frontend','celery']
with DatabaseSession(logger) as session:
config = AugurConfig(logger, session)
connection_string = config.get_section("RabbitMQ")['connection_string']

clear_rabbitmq_messages(connection_string, queues, logger)
clear_redis_caches(logger)

def get_augur_api_processes():
augur_api_processes = []
for process in psutil.process_iter(['cmdline', 'name', 'environ']):
if process.info['cmdline'] is not None and process.info['environ'] is not None:
try:
if is_api_process(process):
augur_api_processes.append(process)
except (KeyError, FileNotFoundError):
pass
return augur_api_processes

def is_api_process(process):

command = ''.join(process.info['cmdline'][:]).lower()
if os.getenv('VIRTUAL_ENV') in process.info['environ']['VIRTUAL_ENV'] and 'python' in command:

if process.pid != os.getpid():

if ("augur.api.server:app" in command or
"augurbackendapi" in command or
("augur.tasks.init.celery_app.celery_app" in command and "frontend" in command)):
return True

return False


Loading

0 comments on commit 7fc61c5

Please sign in to comment.