Skip to content

Commit

Permalink
Hc 332 (#36)
Browse files Browse the repository at this point in the history
* added enable_dedup to on-demand submit api

fixed delete bug in ci un-register jenkins job (using args instead of form/json)

* fixed soft_time_limit bug

* added enable_dedup to on-demand and user rule api

return 'success': False if enable_dedup in not boolean

* added enable_dedup to mozart user rules mapping

* added enable_dedup flag to job params api
  • Loading branch information
DustinKLo authored Mar 13, 2021
1 parent f41d740 commit eaecb5c
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 43 deletions.
3 changes: 3 additions & 0 deletions configs/user_rules_job.mapping
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
},
"disk_usage": {
"type": "keyword"
},
"enable_dedup": {
"type": "boolean"
}
}
}
Expand Down
46 changes: 28 additions & 18 deletions mozart/services/api_v01/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import traceback

from flask import request
from flask_restx import Namespace, Resource, fields
from flask_restx import Namespace, Resource, fields, inputs

from hysds.celery import app as celery_app
from hysds.task_worker import do_submit_task
Expand Down Expand Up @@ -417,16 +417,13 @@ def get(self, _id):


@on_demand_ns.route('', endpoint='on-demand')
@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve on demand jobs")
@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve/submit on demand jobs")
class OnDemandJobs(Resource):
"""On Demand Jobs API. (jobs retrieval and job submission)"""

resp_model = on_demand_ns.model('JsonResponse', {
'success': fields.Boolean(required=True, description="if 'false', encountered exception; "
"otherwise no errors occurred"),
'message': fields.String(required=True, description="message describing success or failure"),
'objectid': fields.String(required=True, description="ID of indexed dataset"),
'index': fields.String(required=True, description="dataset index name"),
'success': fields.Boolean(required=True, description="if request was successful"),
'message': fields.String(required=True, description="message describing success or failure")
})

parser = on_demand_ns.parser()
Expand All @@ -440,6 +437,7 @@ class OnDemandJobs(Resource):
parser.add_argument('time_limit', type=int, location="form", help='time limit for PGE job')
parser.add_argument('soft_time_limit', type=int, location="form", help='soft time limit for PGE job')
parser.add_argument('disk_usage', type=str, location="form", help='memory usage required for jon (KB, MB, GB)')
parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication')

def get(self):
"""List available on demand jobs"""
Expand Down Expand Up @@ -476,16 +474,25 @@ def post(self):
if not request_data:
request_data = request.form

tag = request_data.get('tags', None)
job_type = request_data.get('job_type', None)
hysds_io = request_data.get('hysds_io', None)
queue = request_data.get('queue', None)
tag = request_data.get('tags')
job_type = request_data.get('job_type')
hysds_io = request_data.get('hysds_io')
queue = request_data.get('queue')
priority = int(request_data.get('priority', 0))
query_string = request_data.get('query', None)
query_string = request_data.get('query')
kwargs = request_data.get('kwargs', '{}')
time_limit = request_data.get('time_limit', None)
soft_time_limit = request_data.get('soft_time_limit', None)
disk_usage = request_data.get('disk_usage', None)
time_limit = request_data.get('time_limit')
soft_time_limit = request_data.get('soft_time_limit')
disk_usage = request_data.get('disk_usage')
enable_dedup = request_data.get('enable_dedup')
if enable_dedup is not None:
try:
enable_dedup = inputs.boolean(enable_dedup)
except ValueError as e:
return {
'success': False,
'message': str(e)
}, 400

try:
query = json.loads(query_string)
Expand Down Expand Up @@ -549,6 +556,8 @@ def post(self):

if disk_usage:
rule['disk_usage'] = disk_usage
if enable_dedup is not None:
rule['enable_dedup'] = enable_dedup

payload = {
'type': 'job_iterator',
Expand All @@ -561,6 +570,7 @@ def post(self):

return {
'success': True,
'message': 'task submitted successfully',
'result': celery_task.id
}

Expand All @@ -572,8 +582,7 @@ class JobParams(Resource):
"""Job Params API."""

resp_model = on_demand_ns.model('JsonResponse', {
'success': fields.Boolean(required=True, description="if 'false' encountered exception;"
"otherwise no errors occurred"),
'success': fields.Boolean(required=True, description="if request was processed successfully"),
'message': fields.String(required=True, description="message describing success or failure"),
'objectid': fields.String(required=True, description="ID of indexed dataset"),
'index': fields.String(required=True, description="dataset index name"),
Expand Down Expand Up @@ -619,5 +628,6 @@ def get(self):
'params': job_params,
'time_limit': job_spec['_source']['time_limit'],
'soft_time_limit': job_spec['_source']['soft_time_limit'],
'disk_usage': job_spec['_source']['disk_usage']
'disk_usage': job_spec['_source']['disk_usage'],
'enable_dedup': hysds_io['_source'].get('enable_dedup', True)
}
28 changes: 26 additions & 2 deletions mozart/services/api_v01/user_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import datetime

from flask import request
from flask_restx import Namespace, Resource
from flask_restx import Namespace, Resource, inputs

from hysds_commons.action_utils import check_passthrough_query

Expand Down Expand Up @@ -44,6 +44,7 @@ class UserRules(Resource):
post_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job')
post_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job')
post_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)')
post_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication')

put_parser = user_rule_ns.parser()
put_parser.add_argument('id', type=str, help="rule id")
Expand All @@ -58,6 +59,7 @@ class UserRules(Resource):
put_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job')
put_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job')
put_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)')
put_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication')

@user_rule_ns.expect(parser)
def get(self):
Expand Down Expand Up @@ -126,6 +128,15 @@ def post(self):
time_limit = request_data.get('time_limit', None)
soft_time_limit = request_data.get('soft_time_limit', None)
disk_usage = request_data.get('disk_usage', None)
enable_dedup = request_data.get('enable_dedup')
if enable_dedup is not None:
try:
enable_dedup = inputs.boolean(enable_dedup)
except ValueError as e:
return {
'success': False,
'message': str(e)
}, 400

username = "ops" # TODO: add user role and permissions, hard coded to "ops" for now

Expand Down Expand Up @@ -240,6 +251,8 @@ def post(self):

if disk_usage:
new_doc['disk_usage'] = disk_usage
if enable_dedup is not None:
new_doc['enable_dedup'] = enable_dedup

result = mozart_es.index_document(index=user_rules_index, body=new_doc, refresh=True)
return {
Expand Down Expand Up @@ -275,6 +288,15 @@ def put(self):
time_limit = request_data.get('time_limit', None)
soft_time_limit = request_data.get('soft_time_limit', None)
disk_usage = request_data.get('disk_usage', None)
enable_dedup = request_data.get('enable_dedup')
if enable_dedup is not None:
try:
enable_dedup = inputs.boolean(enable_dedup)
except ValueError as e:
return {
'success': False,
'message': str(e)
}, 400

# check if job_type (hysds_io) exists in ElasticSearch (only if we're updating job_type)
if hysds_io:
Expand Down Expand Up @@ -374,7 +396,7 @@ def put(self):
update_doc['soft_time_limit'] = None
else:
if isinstance(soft_time_limit, int) and 0 < soft_time_limit <= 86400 * 7:
update_doc['soft_time_limit'] = time_limit
update_doc['soft_time_limit'] = soft_time_limit
else:
return {
'success': False,
Expand All @@ -383,6 +405,8 @@ def put(self):

if 'disk_usage' in request_data:
update_doc['disk_usage'] = disk_usage
if 'enable_dedup' in request_data:
update_doc['enable_dedup'] = enable_dedup

app.logger.info('editing document id %s in user_rule index' % _id)

Expand Down
46 changes: 28 additions & 18 deletions mozart/services/api_v02/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import traceback

from flask import request
from flask_restx import Namespace, Resource, fields
from flask_restx import Namespace, Resource, fields, inputs

from hysds.celery import app as celery_app
from hysds.task_worker import do_submit_task
Expand Down Expand Up @@ -360,16 +360,13 @@ def get(self, _id=None):


@on_demand_ns.route('', endpoint='on-demand')
@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve on demand jobs")
@on_demand_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Retrieve/submit on demand jobs")
class OnDemandJobs(Resource):
"""On Demand Jobs API."""

resp_model = on_demand_ns.model('JsonResponse', {
'success': fields.Boolean(required=True, description="if 'false', encountered exception;"
"otherwise no errors occurred"),
'message': fields.String(required=True, description="message describing success or failure"),
'objectid': fields.String(required=True, description="ID of indexed dataset"),
'index': fields.String(required=True, description="dataset index name"),
'success': fields.Boolean(required=True, description="if request was successful"),
'message': fields.String(required=True, description="message describing success or failure")
})

parser = on_demand_ns.parser()
Expand All @@ -383,6 +380,7 @@ class OnDemandJobs(Resource):
parser.add_argument('time_limit', type=int, location="form", help='time limit for PGE job')
parser.add_argument('soft_time_limit', type=int, location="form", help='soft time limit for PGE job')
parser.add_argument('disk_usage', type=str, location="form", help='memory usage required for jon (KB, MB, GB)')
parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication')

def get(self):
"""List available on demand jobs"""
Expand Down Expand Up @@ -419,16 +417,25 @@ def post(self):
if not request_data:
request_data = request.form

tag = request_data.get('tags', None)
job_type = request_data.get('job_type', None)
hysds_io = request_data.get('hysds_io', None)
queue = request_data.get('queue', None)
tag = request_data.get('tags')
job_type = request_data.get('job_type')
hysds_io = request_data.get('hysds_io')
queue = request_data.get('queue')
priority = int(request_data.get('priority', 0))
query_string = request_data.get('query', None)
query_string = request_data.get('query')
kwargs = request_data.get('kwargs', '{}')
time_limit = request_data.get('time_limit', None)
soft_time_limit = request_data.get('soft_time_limit', None)
disk_usage = request_data.get('disk_usage', None)
time_limit = request_data.get('time_limit')
soft_time_limit = request_data.get('soft_time_limit')
disk_usage = request_data.get('disk_usage')
enable_dedup = request_data.get('enable_dedup')
if enable_dedup is not None:
try:
enable_dedup = inputs.boolean(enable_dedup)
except ValueError as e:
return {
'success': False,
'message': str(e)
}, 400

try:
query = json.loads(query_string)
Expand Down Expand Up @@ -492,6 +499,8 @@ def post(self):

if disk_usage:
rule['disk_usage'] = disk_usage
if enable_dedup is not None:
rule['enable_dedup'] = enable_dedup

payload = {
'type': 'job_iterator',
Expand All @@ -504,6 +513,7 @@ def post(self):

return {
'success': True,
'message': 'task submitted successfully',
'result': celery_task.id
}

Expand All @@ -515,8 +525,7 @@ class JobParams(Resource):
"""Job Params API."""

resp_model = on_demand_ns.model('JsonResponse', {
'success': fields.Boolean(required=True, description="if 'false' encountered exception; "
"otherwise no errors occurred"),
'success': fields.Boolean(required=True, description="if request was processed successfully"),
'message': fields.String(required=True, description="message describing success or failure"),
'objectid': fields.String(required=True, description="ID of indexed dataset"),
'index': fields.String(required=True, description="dataset index name"),
Expand Down Expand Up @@ -562,5 +571,6 @@ def get(self):
'params': job_params,
'time_limit': job_spec['_source']['time_limit'],
'soft_time_limit': job_spec['_source']['soft_time_limit'],
'disk_usage': job_spec['_source']['disk_usage']
'disk_usage': job_spec['_source']['disk_usage'],
'enable_dedup': hysds_io['_source'].get('enable_dedup', True)
}
28 changes: 26 additions & 2 deletions mozart/services/api_v02/user_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import datetime

from flask import request
from flask_restx import Namespace, Resource
from flask_restx import Namespace, Resource, inputs

from hysds_commons.action_utils import check_passthrough_query

Expand Down Expand Up @@ -43,6 +43,7 @@ class UserRules(Resource):
post_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job')
post_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job')
post_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)')
post_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication')

put_parser = user_rule_ns.parser()
put_parser.add_argument('id', type=str, help="rule id")
Expand All @@ -57,6 +58,7 @@ class UserRules(Resource):
put_parser.add_argument('time_limit', type=int, location='form', help='time limit for PGE job')
put_parser.add_argument('soft_time_limit', type=int, location='form', help='soft time limit for PGE job')
put_parser.add_argument('disk_usage', type=str, location='form', help='memory usage required for jon (KB, MB, GB)')
put_parser.add_argument('enable_dedup', type=inputs.boolean, location="form", help='enable job de-duplication')

@user_rule_ns.expect(parser)
def get(self):
Expand Down Expand Up @@ -125,6 +127,15 @@ def post(self):
time_limit = request_data.get('time_limit', None)
soft_time_limit = request_data.get('soft_time_limit', None)
disk_usage = request_data.get('disk_usage', None)
enable_dedup = request_data.get('enable_dedup')
if enable_dedup is not None:
try:
enable_dedup = inputs.boolean(enable_dedup)
except ValueError as e:
return {
'success': False,
'message': str(e)
}, 400

username = "ops" # TODO: add user role and permissions, hard coded to "ops" for now

Expand Down Expand Up @@ -239,6 +250,8 @@ def post(self):

if disk_usage:
new_doc['disk_usage'] = disk_usage
if enable_dedup is not None:
new_doc['enable_dedup'] = enable_dedup

result = mozart_es.index_document(index=user_rules_index, body=new_doc, refresh=True)
return {
Expand Down Expand Up @@ -274,6 +287,15 @@ def put(self):
time_limit = request_data.get('time_limit', None)
soft_time_limit = request_data.get('soft_time_limit', None)
disk_usage = request_data.get('disk_usage', None)
enable_dedup = request_data.get('enable_dedup')
if enable_dedup is not None:
try:
enable_dedup = inputs.boolean(enable_dedup)
except ValueError as e:
return {
'success': False,
'message': str(e)
}, 400

# check if job_type (hysds_io) exists in elasticsearch (only if we're updating job_type)
if hysds_io:
Expand Down Expand Up @@ -374,7 +396,7 @@ def put(self):
update_doc['soft_time_limit'] = None
else:
if isinstance(soft_time_limit, int) and 0 < soft_time_limit <= 86400 * 7:
update_doc['soft_time_limit'] = time_limit
update_doc['soft_time_limit'] = soft_time_limit
else:
return {
'success': False,
Expand All @@ -383,6 +405,8 @@ def put(self):

if 'disk_usage' in request_data:
update_doc['disk_usage'] = disk_usage
if 'enable_dedup' in request_data:
update_doc['enable_dedup'] = enable_dedup

app.logger.info('editing document id %s in user_rule index' % _id)

Expand Down
Loading

0 comments on commit eaecb5c

Please sign in to comment.