Skip to content

Commit

Permalink
Working version, expect flake8 errors!
Browse files Browse the repository at this point in the history
Pushed to test GH action.
  • Loading branch information
david-i-berry committed Feb 1, 2024
1 parent 512adbd commit 495bf32
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 106 deletions.
105 changes: 61 additions & 44 deletions wis2box_api/plugins/process/manager/psqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
# =================================================================
import json
import logging
import multiprocessing as mp
import traceback
from typing import Any, Dict, Tuple, Optional, OrderedDict
import uuid

from sqlalchemy import (create_engine, Integer)
from sqlalchemy import (create_engine, Integer, String, text, bindparam)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.engine import URL as dbURL
from sqlalchemy.orm import (DeclarativeBase, Mapped, mapped_column, Session)
Expand All @@ -50,16 +52,20 @@
RequestedProcessExecutionMode,
)

#mp.set_start_method("spawn")
LOGGER = logging.getLogger(__name__)

#LOGGER.warning("spawning process pool")
# _pool = mp.Pool( max(1,mp.cpu_count() - 2)) # noqa arbitrary # of procs.
#_pool = mp.Pool(1)
#LOGGER.warning(max(1,mp.cpu_count() - 2))

class Base(DeclarativeBase):
pass


class JobManagerPygeoapi(Base):
__tablename__ = "job_manager_pygeoapi"
id = mapped_column(Integer, primary_key=True)
id = mapped_column(String, primary_key=True)
job = mapped_column(JSONB, nullable=False)


Expand Down Expand Up @@ -110,34 +116,34 @@ def _connect(self):
exc_info=(traceback))
return False

def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict) -> Tuple[str, None, JobStatus]:
"""
Updated execution handler to execute a process in a background
process using `multiprocessing.Process`
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process # noqa
:param p: `pygeoapi.process` object
:param job_id: job identifier
:param data_dict: `dict` of data parameters
:returns: tuple of None (i.e. initial response payload)
and JobStatus.accepted (i.e. initial job status)
"""
_pool.apply_async(
func=self._execute_handler_sync,
args=(p, job_id, data_dict))

return 'application/json', None, JobStatus.accepted
# def _execute_handler_async(self, p: BaseProcessor, job_id: str,
# data_dict: dict) ->
# Tuple[str, None, JobStatus]:
#"""
#Updated execution handler to execute a process in a background
#process using `multiprocessing.Process`##
#
#
#:param p: `pygeoapi.process` object
#:param job_id: job identifier
#:param data_dict: `dict` of data parameters
#
#:returns: tuple of None (i.e. initial response payload)
# and JobStatus.accepted (i.e. initial job status)
#"""
#_pool.apply_async(
# func=self._execute_handler_sync,
#args=(p, job_id, data_dict))
#
#return 'application/json', None, JobStatus.accepted

def destroy(self):
try:
self.db.close()
LOGGER.info("JOBMANAGER - psql disconnected")
return True
except Exception:
self.destroy()
#self.destroy()
LOGGER.error("JOBMANAGER - destroy error",
exc_info=(traceback))
return False
Expand All @@ -147,27 +153,31 @@ def get_jobs(self, status=None):
self._connect()
if status is not None:
query = text("SELECT job from job_manager_pygeoapi WHERE job->>'status' = ':status'") # noqa
jobs = self.db.execute(query, status = status).fetchall()
result = self.db.execute(query, parameters=dict(status = status)).fetchall() # noqa
else:
query = text("SELECT job from job_manager_pygeoapi") # noqa
jobs = self.db.execute(query).fetchall()

pass
result = self.db.execute(query).fetchall()
self.destroy()
LOGGER.info("JOBMANAGER - psql jobs queried")
# now convert jobs to list of dicts
jobs = [dict(row[0]) for row in result]
return jobs
except Exception:
LOGGER.error("JOBMANAGER - get_jobs error",
exc_info=(traceback))
return False

def add_job(self, job_metadata):
job_id = job_metadata.get('identifier')
if job_id is None:
job_id = str(uuid.uuid4())
try:
self._connect()
query = text("INSERT INTO job_manager_pygeoapi (job) VALUES (:job_metadata) RETURNING id")
result = self.db.execute(query, job_metadata = job_metadata)
query = text("INSERT INTO job_manager_pygeoapi (id, job) VALUES (:job_id, :job_metadata) RETURNING id") # noqa
query = query.bindparams(bindparam('job_metadata', type_=JSONB),
bindparam('job_id', type_=String))
result = self.db.execute(query, parameters=dict(job_id = job_id, job_metadata = job_metadata)) # noqa
doc_id = result.fetchone()[0]
self.db.execute("COMMIT")
self.db.commit()
self.destroy()
LOGGER.info("JOBMANAGER - psql job added")
return doc_id
Expand All @@ -181,15 +191,19 @@ def update_job(self, job_id, update_dict):
try:
self._connect()
# first get the job to update
query = text("SELECT job from job_manager_pygeoapi WHERE id = :job_id")
result = self.db.execute(query, job_id=job_id).fetchone()[0]
query = text("SELECT job from job_manager_pygeoapi WHERE id =:job_id") # noqa
query = query.bindparams(bindparam('job_id', type_ = String))
result = self.db.execute(query, parameters=dict(job_id=job_id)).fetchone() # noqa
result = dict(result[0]) # convert to dict
# update the dict
for k,v in update_dict.items():
result[k] = v
# now back to DB
query = text("UPDATE job_manager_pygeoapi SET job = :update_dict where id := job_id RETURNING id")
result = self.db.execute(query, update_dict = result, job_id = job_id)
self.db.execute("COMMIT")
query = text("UPDATE job_manager_pygeoapi SET job =:update_dict WHERE id =:job_id RETURNING id") # noqa
query = query.bindparams(bindparam('job_id', type_=String),
bindparam('update_dict', type_=JSONB))
self.db.execute(query.bindparams(bindparam('update_dict', type_=JSONB)), parameters=dict(update_dict = result, job_id = job_id)) # noqa
self.db.commit()
self.destroy()
LOGGER.info("JOBMANAGER - psql job updated")
return True
Expand All @@ -202,9 +216,10 @@ def update_job(self, job_id, update_dict):
def delete_job(self, job_id):
try:
self._connect()
query = text("DELETE FROM job_manager_pygeoapi where id = : job_id")
result = self.db.execute(query, job_id = job_id)
self.db.execute("COMMIT")
query = text("DELETE FROM job_manager_pygeoapi where id =:job_id")
query = query.bindparams(bindparam('job_id', type_=String))
result = self.db.execute(query, parameters=dict(job_id = job_id))
self.db.commit()
self.destroy()
LOGGER.info("JOBMANAGER - psql job deleted")
return True
Expand All @@ -216,8 +231,9 @@ def delete_job(self, job_id):
def get_job(self, job_id):
try:
self._connect()
query = text("SELECT job from job_manager_pygeoapi WHERE id = :job_id")
result = self.db.execute(query, job_id = job_id)
query = text("SELECT job from job_manager_pygeoapi WHERE id =:job_id") # noqa
query = query.bindparams(bindparam('job_id', type_=String))
result = self.db.execute(query, parameters=dict(job_id = job_id))
entry = result.fetchone()[0]
self.destroy()
LOGGER.info("JOBMANAGER - psql job queried")
Expand All @@ -230,8 +246,9 @@ def get_job(self, job_id):
def get_job_result(self, job_id):
try:
self._connect()
query = text("SELECT job from job_manager_pygeoapi WHERE id = :job_id")
result = self.db.execute(query, job_id = job_id)
query = text("SELECT job from job_manager_pygeoapi WHERE id =:job_id") # noqa
query = query.bindparams(bindparam('job_id', type_=String))
result = self.db.execute(query, parameters=dict(job_id = job_id))
entry = result.fetchone()[0]
self.destroy()
if entry["status"] != "successful":
Expand Down
62 changes: 0 additions & 62 deletions wis2box_api/plugins/process/manager/tinydb_mp.py

This file was deleted.

0 comments on commit 495bf32

Please sign in to comment.