Skip to content

Commit

Permalink
added some log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
AmjadUllah committed Jul 27, 2019
1 parent 4e6bf0f commit b49680b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
10 changes: 10 additions & 0 deletions job_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
import container_worker as jqw
from container_worker import job_app

# temp code [au]
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# --------------------------------

# What to do when a job fails
class JQueuer_Task(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
Expand All @@ -37,6 +43,10 @@ def add(self, exp_id, job_queue_id, job):

worker_id = self.request.hostname.split("@")[1]

# temp code [au]
logger.info("In job_operations run_job")
# --------------------------------

monitoring.run_job(
getNodeID(worker_id), exp_id, getServiceName(worker_id), worker_id, job["id"]
)
Expand Down
30 changes: 30 additions & 0 deletions monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

from prometheus_client import start_http_server, Gauge, Counter, Histogram

# temp code [au]
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# --------------------------------

def start(metrics_agent_port):
start_http_server(metrics_agent_port)

Expand All @@ -12,10 +18,16 @@ def start(metrics_agent_port):


def add_worker(node_id, service_name):
# temp code [au]
logger.info("In monitoring add_worker")
# --------------------------------
node_counter.labels(node_id,service_name).inc()


def terminate_worker(node_id, service_name):
# temp code [au]
logger.info("In monitoring terminate_worker")
# --------------------------------
node_counter.labels(node_id,service_name).dec()


Expand All @@ -31,6 +43,9 @@ def terminate_worker(node_id, service_name):
job_started = Gauge(JQUEUER_JOB_STARTED,JQUEUER_JOB_STARTED,["node_id","experiment_id","service_name","qworker_id","job_id"])

def run_job(node_id, experiment_id, service_name, qworker_id, job_id):
# temp code [au]
logger.info("In monitoring run_job")
# --------------------------------
job_started_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time())
job_running_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time())
job_running.labels(node_id,experiment_id,service_name,qworker_id,job_id).set(1)
Expand All @@ -47,6 +62,9 @@ def run_job(node_id, experiment_id, service_name, qworker_id, job_id):
job_accomplished = Gauge(JQUEUER_JOB_ACCOMPLISHED,JQUEUER_JOB_ACCOMPLISHED,["node_id","experiment_id","service_name","qworker_id","job_id"])

def terminate_job(node_id, experiment_id, service_name, qworker_id, job_id, start_time):
# temp code [au]
logger.info("In monitoring terminate_job")
# --------------------------------
elapsed_time = time.time() - start_time
job_accomplished_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time())
job_running_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time())
Expand All @@ -64,6 +82,9 @@ def terminate_job(node_id, experiment_id, service_name, qworker_id, job_id, star
job_failed = Gauge(JQUEUER_JOB_FAILED,JQUEUER_JOB_FAILED,["node_id","experiment_id","service_name","qworker_id","job_id"])

def job_failed(node_id, experiment_id, service_name, qworker_id, job_id, fail_time):
# temp code [au]
logger.info("In monitoring job_failed")
# --------------------------------
elapsed_time = time.time() - fail_time
job_failed_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time())
job_running_timestamp.labels(node_id,experiment_id,service_name,job_id).set(time.time())
Expand All @@ -84,6 +105,9 @@ def job_failed(node_id, experiment_id, service_name, qworker_id, job_id, fail_ti
task_started = Gauge(JQUEUER_TASK_STARTED,JQUEUER_TASK_STARTED,["node_id","experiment_id","service_name","qworker_id","job_id","task_id"])

def run_task(node_id, experiment_id, service_name, qworker_id, job_id, task_id):
# temp code [au]
logger.info("In monitoring run_task")
# --------------------------------
task_started_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time())
task_running_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time())
task_running.labels(node_id,experiment_id,service_name,qworker_id,job_id,task_id).set(1)
Expand All @@ -101,6 +125,9 @@ def run_task(node_id, experiment_id, service_name, qworker_id, job_id, task_id):
def terminate_task(
node_id, experiment_id, service_name, qworker_id, job_id, task_id, start_time
):
# temp code [au]
logger.info("In monitoring terminate_task")
# --------------------------------
elapsed_time = time.time() - start_time
task_accomplished_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time())
# In the previous case, this didn't include task_id.
Expand All @@ -121,6 +148,9 @@ def terminate_task(
def task_failed(
node_id, experiment_id, service_name, qworker_id, job_id, task_id, fail_time
):
# temp code [au]
logger.info("In monitoring task_failed")
# --------------------------------
elapsed_time = time.time() - fail_time
task_failed_timestamp.labels(node_id,experiment_id,service_name,job_id,task_id).set(time.time())
# In the previous case, this didn't include task_id.
Expand Down

0 comments on commit b49680b

Please sign in to comment.