Skip to content

Commit 9195e4f

Browse files
committed
make execute_workflow a flow
1 parent 6c9de64 commit 9195e4f

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

jupyter_scheduler/executors.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from jupyter_scheduler.orm import Job, Workflow, create_session
2020
from jupyter_scheduler.parameterize import add_parameters
2121
from jupyter_scheduler.utils import get_utc_timestamp
22-
from jupyter_scheduler.workflows import DescribeWorkflow
22+
from jupyter_scheduler.workflows import DescribeTask, DescribeWorkflow
2323

2424

2525
class ExecutionManager(ABC):
@@ -188,26 +188,40 @@ class DefaultExecutionManager(ExecutionManager):
188188
"""Default execution manager that executes notebooks"""
189189

190190
@task(task_run_name="{task_id}")
191-
def execute_task(task_id: str):
191+
def execute_task(self, task_id: str):
192192
print(f"Task {task_id} executed")
193193
return task_id
194194

195-
@flow(task_runner=DaskTaskRunner())
195+
@task
196+
def get_task_data(self, task_ids: List[str] = []):
197+
# TODO: get orm objects from Task table of the db, create DescribeTask for each
198+
tasks_data_obj = [
199+
{"id": "task0", "dependsOn": ["task3"]},
200+
{"id": "task4", "dependsOn": ["task0", "task1", "task2", "task3"]},
201+
{"id": "task1", "dependsOn": []},
202+
{"id": "task2", "dependsOn": ["task1"]},
203+
{"id": "task3", "dependsOn": ["task1", "task2"]},
204+
]
205+
206+
return tasks_data_obj
207+
208+
@flow()
196209
def execute_workflow(self):
197-
workflow: DescribeWorkflow = self.model
198-
tasks = {task["id"]: task for task in workflow.tasks}
210+
211+
tasks_info = self.get_task_data()
212+
tasks = {task["id"]: task for task in tasks_info}
199213

200214
# create Prefect tasks, use caching to ensure Prefect tasks are created before wait_for is called on them
201215
@lru_cache(maxsize=None)
202-
def make_task(task_id, execute_task):
216+
def make_task(task_id):
203217
deps = tasks[task_id]["dependsOn"]
204-
return execute_task.submit(
205-
task_id, wait_for=[make_task(dep_id, execute_task) for dep_id in deps]
218+
return self.execute_task.submit(
219+
task_id, wait_for=[make_task(dep_id) for dep_id in deps]
206220
)
207221

208-
final_tasks = [make_task(task_id, self.execute_task) for task_id in tasks]
222+
final_tasks = [make_task(task_id) for task_id in tasks]
209223
for future in as_completed(final_tasks):
210-
print(future.result())
224+
future.result()
211225

212226
def execute(self):
213227
job = self.model

jupyter_scheduler/scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,8 @@ def create_workflow(self, model: CreateWorkflow) -> str:
541541
session.commit()
542542

543543
execution_manager = self.execution_manager_class(
544+
job_id="123",
545+
staging_paths=dict(),
544546
workflow_id=workflow.workflow_id,
545547
root_dir=self.root_dir,
546548
db_url=self.db_url,

jupyter_scheduler/workflows.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,14 @@ class DescribeWorkflow(BaseModel):
5555
workflow_id: str
5656
tasks: List[str] = None
5757
status: Status = Status.CREATED
58+
59+
class Config:
60+
orm_mode = True
61+
62+
63+
class DescribeTask(BaseModel):
64+
dependsOn: List[str] = []
65+
status: Status = Status.CREATED
66+
67+
class Config:
68+
orm_mode = True

0 commit comments

Comments
 (0)