From eaecb5c8442e9a3d7fa9f6df4dd4c27397b659f1 Mon Sep 17 00:00:00 2001 From: Dustin Lo Date: Fri, 12 Mar 2021 18:07:36 -0800 Subject: [PATCH] Hc 332 (#36) * added enable_dedup to on-demand submit api fixed delete bug in ci un-register jenkins job (using args instead of form/json) * fixed soft_time_limit bug * added enable_dedup to on-demand and user rule api return 'success': False if enable_dedup in not boolean * added enable_dedup to mozart user rules mapping * added enable_dedup flag to job params api --- configs/user_rules_job.mapping | 3 ++ mozart/services/api_v01/jobs.py | 46 ++++++++++++++++----------- mozart/services/api_v01/user_rules.py | 28 ++++++++++++++-- mozart/services/api_v02/jobs.py | 46 ++++++++++++++++----------- mozart/services/api_v02/user_rules.py | 28 ++++++++++++++-- mozart/services/ci.py | 8 +++-- 6 files changed, 116 insertions(+), 43 deletions(-) diff --git a/configs/user_rules_job.mapping b/configs/user_rules_job.mapping index fbdc4617..a77e4766 100644 --- a/configs/user_rules_job.mapping +++ b/configs/user_rules_job.mapping @@ -67,6 +67,9 @@ }, "disk_usage": { "type": "keyword" + }, + "enable_dedup": { + "type": "boolean" } } } diff --git a/mozart/services/api_v01/jobs.py b/mozart/services/api_v01/jobs.py index 6a7a3ee7..51d6550b 100644 --- a/mozart/services/api_v01/jobs.py +++ b/mozart/services/api_v01/jobs.py @@ -11,7 +11,7 @@ import traceback from flask import request -from flask_restx import Namespace, Resource, fields +from flask_restx import Namespace, Resource, fields, inputs from hysds.celery import app as celery_app from hysds.task_worker import do_submit_task @@ -417,16 +417,13 @@ def get(self, _id): @on_demand_ns.route('', endpoint='on-demand') -@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve on demand jobs") +@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve/submit on demand jobs") class OnDemandJobs(Resource): """On Demand Jobs API. (jobs retrieval and job submission)""" resp_model = on_demand_ns.model('JsonResponse', { - 'success': fields.Boolean(required=True, description="if 'false', encountered exception; " - "otherwise no errors occurred"), - 'message': fields.String(required=True, description="message describing success or failure"), - 'objectid': fields.String(required=True, description="ID of indexed dataset"), - 'index': fields.String(required=True, description="dataset index name"), + 'success': fields.Boolean(required=True, description="if request was successful"), + 'message': fields.String(required=True, description="message describing success or failure") }) parser = on_demand_ns.parser() @@ -440,6 +437,7 @@ class OnDemandJobs(Resource): parser.add_argument('time_limit', type=int, location="form", help='time limit for PGE job') parser.add_argument('soft_time_limit', type=int, location="form", help='soft time limit for PGE job') parser.add_argument('disk_usage', type=str, location="form", help='memory usage required for jon (KB, MB, GB)') + parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication') def get(self): """List available on demand jobs""" @@ -476,16 +474,25 @@ def post(self): if not request_data: request_data = request.form - tag = request_data.get('tags', None) - job_type = request_data.get('job_type', None) - hysds_io = request_data.get('hysds_io', None) - queue = request_data.get('queue', None) + tag = request_data.get('tags') + job_type = request_data.get('job_type') + hysds_io = request_data.get('hysds_io') + queue = request_data.get('queue') priority = int(request_data.get('priority', 0)) - query_string = request_data.get('query', None) + query_string = request_data.get('query') kwargs = request_data.get('kwargs', '{}') - time_limit = request_data.get('time_limit', None) - soft_time_limit = request_data.get('soft_time_limit', None) - disk_usage = request_data.get('disk_usage', None) + time_limit = request_data.get('time_limit') + soft_time_limit = request_data.get('soft_time_limit') + disk_usage = request_data.get('disk_usage') + enable_dedup = request_data.get('enable_dedup') + if enable_dedup is not None: + try: + enable_dedup = inputs.boolean(enable_dedup) + except ValueError as e: + return { + 'success': False, + 'message': str(e) + }, 400 try: query = json.loads(query_string) @@ -549,6 +556,8 @@ def post(self): if disk_usage: rule['disk_usage'] = disk_usage + if enable_dedup is not None: + rule['enable_dedup'] = enable_dedup payload = { 'type': 'job_iterator', @@ -561,6 +570,7 @@ def post(self): return { 'success': True, + 'message': 'task submitted successfully', 'result': celery_task.id } @@ -572,8 +582,7 @@ class JobParams(Resource): """Job Params API.""" resp_model = on_demand_ns.model('JsonResponse', { - 'success': fields.Boolean(required=True, description="if 'false' encountered exception;" - "otherwise no errors occurred"), + 'success': fields.Boolean(required=True, description="if request was processed successfully"), 'message': fields.String(required=True, description="message describing success or failure"), 'objectid': fields.String(required=True, description="ID of indexed dataset"), 'index': fields.String(required=True, description="dataset index name"), @@ -619,5 +628,6 @@ def get(self): 'params': job_params, 'time_limit': job_spec['_source']['time_limit'], 'soft_time_limit': job_spec['_source']['soft_time_limit'], - 'disk_usage': job_spec['_source']['disk_usage'] + 'disk_usage': job_spec['_source']['disk_usage'], + 'enable_dedup': hysds_io['_source'].get('enable_dedup', True) } diff --git a/mozart/services/api_v01/user_rules.py b/mozart/services/api_v01/user_rules.py index 96c9ac40..a11525e7 100644 --- a/mozart/services/api_v01/user_rules.py +++ b/mozart/services/api_v01/user_rules.py @@ -11,7 +11,7 @@ from datetime import datetime from flask import request -from flask_restx import Namespace, Resource +from flask_restx import Namespace, Resource, inputs from hysds_commons.action_utils import check_passthrough_query @@ -44,6 +44,7 @@ class UserRules(Resource): post_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job') post_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job') post_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)') + post_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication') put_parser = user_rule_ns.parser() put_parser.add_argument('id', type=str, help="rule id") @@ -58,6 +59,7 @@ class UserRules(Resource): put_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job') put_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job') put_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)') + put_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication') @user_rule_ns.expect(parser) def get(self): @@ -126,6 +128,15 @@ def post(self): time_limit = request_data.get('time_limit', None) soft_time_limit = request_data.get('soft_time_limit', None) disk_usage = request_data.get('disk_usage', None) + enable_dedup = request_data.get('enable_dedup') + if enable_dedup is not None: + try: + enable_dedup = inputs.boolean(enable_dedup) + except ValueError as e: + return { + 'success': False, + 'message': str(e) + }, 400 username = "ops" # TODO: add user role and permissions, hard coded to "ops" for now @@ -240,6 +251,8 @@ def post(self): if disk_usage: new_doc['disk_usage'] = disk_usage + if enable_dedup is not None: + new_doc['enable_dedup'] = enable_dedup result = mozart_es.index_document(index=user_rules_index, body=new_doc, refresh=True) return { @@ -275,6 +288,15 @@ def put(self): time_limit = request_data.get('time_limit', None) soft_time_limit = request_data.get('soft_time_limit', None) disk_usage = request_data.get('disk_usage', None) + enable_dedup = request_data.get('enable_dedup') + if enable_dedup is not None: + try: + enable_dedup = inputs.boolean(enable_dedup) + except ValueError as e: + return { + 'success': False, + 'message': str(e) + }, 400 # check if job_type (hysds_io) exists in ElasticSearch (only if we're updating job_type) if hysds_io: @@ -374,7 +396,7 @@ def put(self): update_doc['soft_time_limit'] = None else: if isinstance(soft_time_limit, int) and 0 < soft_time_limit <= 86400 * 7: - update_doc['soft_time_limit'] = time_limit + update_doc['soft_time_limit'] = soft_time_limit else: return { 'success': False, @@ -383,6 +405,8 @@ def put(self): if 'disk_usage' in request_data: update_doc['disk_usage'] = disk_usage + if 'enable_dedup' in request_data: + update_doc['enable_dedup'] = enable_dedup app.logger.info('editing document id %s in user_rule index' % _id) diff --git a/mozart/services/api_v02/jobs.py b/mozart/services/api_v02/jobs.py index d6abb5bb..defd7eab 100644 --- a/mozart/services/api_v02/jobs.py +++ b/mozart/services/api_v02/jobs.py @@ -11,7 +11,7 @@ import traceback from flask import request -from flask_restx import Namespace, Resource, fields +from flask_restx import Namespace, Resource, fields, inputs from hysds.celery import app as celery_app from hysds.task_worker import do_submit_task @@ -360,16 +360,13 @@ def get(self, _id=None): @on_demand_ns.route('', endpoint='on-demand') -@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve on demand jobs") +@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve/submit on demand jobs") class OnDemandJobs(Resource): """On Demand Jobs API.""" resp_model = on_demand_ns.model('JsonResponse', { - 'success': fields.Boolean(required=True, description="if 'false', encountered exception;" - "otherwise no errors occurred"), - 'message': fields.String(required=True, description="message describing success or failure"), - 'objectid': fields.String(required=True, description="ID of indexed dataset"), - 'index': fields.String(required=True, description="dataset index name"), + 'success': fields.Boolean(required=True, description="if request was successful"), + 'message': fields.String(required=True, description="message describing success or failure") }) parser = on_demand_ns.parser() @@ -383,6 +380,7 @@ class OnDemandJobs(Resource): parser.add_argument('time_limit', type=int, location="form", help='time limit for PGE job') parser.add_argument('soft_time_limit', type=int, location="form", help='soft time limit for PGE job') parser.add_argument('disk_usage', type=str, location="form", help='memory usage required for jon (KB, MB, GB)') + parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication') def get(self): """List available on demand jobs""" @@ -419,16 +417,25 @@ def post(self): if not request_data: request_data = request.form - tag = request_data.get('tags', None) - job_type = request_data.get('job_type', None) - hysds_io = request_data.get('hysds_io', None) - queue = request_data.get('queue', None) + tag = request_data.get('tags') + job_type = request_data.get('job_type') + hysds_io = request_data.get('hysds_io') + queue = request_data.get('queue') priority = int(request_data.get('priority', 0)) - query_string = request_data.get('query', None) + query_string = request_data.get('query') kwargs = request_data.get('kwargs', '{}') - time_limit = request_data.get('time_limit', None) - soft_time_limit = request_data.get('soft_time_limit', None) - disk_usage = request_data.get('disk_usage', None) + time_limit = request_data.get('time_limit') + soft_time_limit = request_data.get('soft_time_limit') + disk_usage = request_data.get('disk_usage') + enable_dedup = request_data.get('enable_dedup') + if enable_dedup is not None: + try: + enable_dedup = inputs.boolean(enable_dedup) + except ValueError as e: + return { + 'success': False, + 'message': str(e) + }, 400 try: query = json.loads(query_string) @@ -492,6 +499,8 @@ def post(self): if disk_usage: rule['disk_usage'] = disk_usage + if enable_dedup is not None: + rule['enable_dedup'] = enable_dedup payload = { 'type': 'job_iterator', @@ -504,6 +513,7 @@ def post(self): return { 'success': True, + 'message': 'task submitted successfully', 'result': celery_task.id } @@ -515,8 +525,7 @@ class JobParams(Resource): """Job Params API.""" resp_model = on_demand_ns.model('JsonResponse', { - 'success': fields.Boolean(required=True, description="if 'false' encountered exception; " - "otherwise no errors occurred"), + 'success': fields.Boolean(required=True, description="if request was processed successfully"), 'message': fields.String(required=True, description="message describing success or failure"), 'objectid': fields.String(required=True, description="ID of indexed dataset"), 'index': fields.String(required=True, description="dataset index name"), @@ -562,5 +571,6 @@ def get(self): 'params': job_params, 'time_limit': job_spec['_source']['time_limit'], 'soft_time_limit': job_spec['_source']['soft_time_limit'], - 'disk_usage': job_spec['_source']['disk_usage'] + 'disk_usage': job_spec['_source']['disk_usage'], + 'enable_dedup': hysds_io['_source'].get('enable_dedup', True) } diff --git a/mozart/services/api_v02/user_rules.py b/mozart/services/api_v02/user_rules.py index a32d5063..2f59eb77 100644 --- a/mozart/services/api_v02/user_rules.py +++ b/mozart/services/api_v02/user_rules.py @@ -11,7 +11,7 @@ from datetime import datetime from flask import request -from flask_restx import Namespace, Resource +from flask_restx import Namespace, Resource, inputs from hysds_commons.action_utils import check_passthrough_query @@ -43,6 +43,7 @@ class UserRules(Resource): post_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job') post_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job') post_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)') + post_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication') put_parser = user_rule_ns.parser() put_parser.add_argument('id', type=str, help="rule id") @@ -57,6 +58,7 @@ class UserRules(Resource): put_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job') put_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job') put_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)') + put_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication') @user_rule_ns.expect(parser) def get(self): @@ -125,6 +127,15 @@ def post(self): time_limit = request_data.get('time_limit', None) soft_time_limit = request_data.get('soft_time_limit', None) disk_usage = request_data.get('disk_usage', None) + enable_dedup = request_data.get('enable_dedup') + if enable_dedup is not None: + try: + enable_dedup = inputs.boolean(enable_dedup) + except ValueError as e: + return { + 'success': False, + 'message': str(e) + }, 400 username = "ops" # TODO: add user role and permissions, hard coded to "ops" for now @@ -239,6 +250,8 @@ def post(self): if disk_usage: new_doc['disk_usage'] = disk_usage + if enable_dedup is not None: + new_doc['enable_dedup'] = enable_dedup result = mozart_es.index_document(index=user_rules_index, body=new_doc, refresh=True) return { @@ -274,6 +287,15 @@ def put(self): time_limit = request_data.get('time_limit', None) soft_time_limit = request_data.get('soft_time_limit', None) disk_usage = request_data.get('disk_usage', None) + enable_dedup = request_data.get('enable_dedup') + if enable_dedup is not None: + try: + enable_dedup = inputs.boolean(enable_dedup) + except ValueError as e: + return { + 'success': False, + 'message': str(e) + }, 400 # check if job_type (hysds_io) exists in elasticsearch (only if we're updating job_type) if hysds_io: @@ -374,7 +396,7 @@ def put(self): update_doc['soft_time_limit'] = None else: if isinstance(soft_time_limit, int) and 0 < soft_time_limit <= 86400 * 7: - update_doc['soft_time_limit'] = time_limit + update_doc['soft_time_limit'] = soft_time_limit else: return { 'success': False, @@ -383,6 +405,8 @@ def put(self): if 'disk_usage' in request_data: update_doc['disk_usage'] = disk_usage + if 'enable_dedup' in request_data: + update_doc['enable_dedup'] = enable_dedup app.logger.info('editing document id %s in user_rule index' % _id) diff --git a/mozart/services/ci.py b/mozart/services/ci.py index 24413d4c..4dfdb027 100644 --- a/mozart/services/ci.py +++ b/mozart/services/ci.py @@ -117,12 +117,14 @@ def post(self): @job_registration_ns.expect(delete_parser) def delete(self): """deletes Jenkins job""" - request_data = request.json or request.form - repo = request_data.get('repo') - branch = request_data.get('branch') + repo = request.args.get('repo') + branch = request.args.get('branch') app.logger.info("repo: %s" % repo) app.logger.info("branch: %s" % branch) + if repo is None: + return {'success': True, 'message': 'repo not supplied'}, 400 + job_name = get_ci_job_name(repo, branch) if job_name is None: return {