From b49680b81503699868603bb7a312a5c19de121fc Mon Sep 17 00:00:00 2001 From: amjad_ullah Date: Sat, 27 Jul 2019 12:39:07 +0100 Subject: [PATCH] added some log messages --- job_operations.py | 10 ++++++++++ monitoring.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/job_operations.py b/job_operations.py index 70f5f54..7f7a755 100644 --- a/job_operations.py +++ b/job_operations.py @@ -13,6 +13,12 @@ import container_worker as jqw from container_worker import job_app +# temp code [au] +import logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) +# -------------------------------- + # What to do when a job fails class JQueuer_Task(celery.Task): def on_failure(self, exc, task_id, args, kwargs, einfo): @@ -37,6 +43,10 @@ def add(self, exp_id, job_queue_id, job): worker_id = self.request.hostname.split("@")[1] + # temp code [au] + logger.info("In job_operations run_job") + # -------------------------------- + monitoring.run_job( getNodeID(worker_id), exp_id, getServiceName(worker_id), worker_id, job["id"] ) diff --git a/monitoring.py b/monitoring.py index f9e363e..1f68aee 100644 --- a/monitoring.py +++ b/monitoring.py @@ -3,6 +3,12 @@ from prometheus_client import start_http_server, Gauge, Counter, Histogram +# temp code [au] +import logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) +# -------------------------------- + def start(metrics_agent_port): start_http_server(metrics_agent_port) @@ -12,10 +18,16 @@ def start(metrics_agent_port): def add_worker(node_id, service_name): + # temp code [au] + logger.info("In monitoring add_worker") + # -------------------------------- node_counter.labels(node_id,service_name).inc() def terminate_worker(node_id, service_name): + # temp code [au] + logger.info("In monitoring terminate_worker") + # -------------------------------- node_counter.labels(node_id,service_name).dec() @@ -31,6 +43,9 @@ def terminate_worker(node_id, service_name): job_started = Gauge(JQUEUER_JOB_STARTED,JQUEUER_JOB_STARTED,["node_id","experiment_id","service_name","qworker_id","job_id"]) def run_job(node_id, experiment_id, service_name, qworker_id, job_id): + # temp code [au] + logger.info("In monitoring run_job") + # -------------------------------- job_started_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time()) job_running_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time()) job_running.labels(node_id,experiment_id,service_name,qworker_id,job_id).set(1) @@ -47,6 +62,9 @@ def run_job(node_id, experiment_id, service_name, qworker_id, job_id): job_accomplished = Gauge(JQUEUER_JOB_ACCOMPLISHED,JQUEUER_JOB_ACCOMPLISHED,["node_id","experiment_id","service_name","qworker_id","job_id"]) def terminate_job(node_id, experiment_id, service_name, qworker_id, job_id, start_time): + # temp code [au] + logger.info("In monitoring terminate_job") + # -------------------------------- elapsed_time = time.time() - start_time job_accomplished_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time()) job_running_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time()) @@ -64,6 +82,9 @@ def terminate_job(node_id, experiment_id, service_name, qworker_id, job_id, star job_failed = Gauge(JQUEUER_JOB_FAILED,JQUEUER_JOB_FAILED,["node_id","experiment_id","service_name","qworker_id","job_id"]) def job_failed(node_id, experiment_id, service_name, qworker_id, job_id, fail_time): + # temp code [au] + logger.info("In monitoring job_failed") + # -------------------------------- elapsed_time = time.time() - fail_time job_failed_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time()) job_running_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time()) @@ -84,6 +105,9 @@ def job_failed(node_id, experiment_id, service_name, qworker_id, job_id, fail_ti task_started = Gauge(JQUEUER_TASK_STARTED,JQUEUER_TASK_STARTED,["node_id","experiment_id","service_name","qworker_id","job_id","task_id"]) def run_task(node_id, experiment_id, service_name, qworker_id, job_id, task_id): + # temp code [au] + logger.info("In monitoring run_task") + # -------------------------------- task_started_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time()) task_running_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time()) task_running.labels(node_id,experiment_id,service_name,qworker_id,job_id,task_id).set(1) @@ -101,6 +125,9 @@ def run_task(node_id, experiment_id, service_name, qworker_id, job_id, task_id): def terminate_task( node_id, experiment_id, service_name, qworker_id, job_id, task_id, start_time ): + # temp code [au] + logger.info("In monitoring terminate_task") + # -------------------------------- elapsed_time = time.time() - start_time task_accomplished_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time()) # In the previous case, this didn't include task_id. @@ -121,6 +148,9 @@ def terminate_task( def task_failed( node_id, experiment_id, service_name, qworker_id, job_id, task_id, fail_time ): + # temp code [au] + logger.info("In monitoring task_failed") + # -------------------------------- elapsed_time = time.time() - fail_time task_failed_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time()) # In the previous case, this didn't include task_id.