Skip to content

Commit 417f392

Browse files
committed
serve workflow definition on schedule
1 parent be6a01f commit 417f392

File tree

1 file changed

+43
-11
lines changed

1 file changed

+43
-11
lines changed

jupyter_scheduler/executors.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import io
22
import multiprocessing as mp
33
import os
4+
from pathlib import Path
45
import shutil
56
import tarfile
67
import traceback
@@ -20,7 +21,7 @@
2021
from jupyter_scheduler.parameterize import add_parameters
2122
from jupyter_scheduler.scheduler import Scheduler
2223
from jupyter_scheduler.utils import get_utc_timestamp
23-
from jupyter_scheduler.workflows import DescribeWorkflow, DescribeWorkflowDefinition
24+
from jupyter_scheduler.workflows import CreateWorkflow, DescribeWorkflow, DescribeWorkflowDefinition
2425

2526

2627
class ExecutionManager(ABC):
@@ -197,25 +198,56 @@ def on_complete_workflow(self):
197198
session.commit()
198199

199200

201+
@flow(name="Create and run a new workflow`")
202+
def create_and_run_workflow(tasks: List[str], root_dir, db_url):
203+
db_session = create_session(db_url)
204+
with db_session() as session:
205+
workflow = Workflow(tasks=tasks)
206+
session.add(workflow)
207+
session.commit()
208+
workflow_id = workflow.workflow_id
209+
execution_manager = DefaultExecutionManager(
210+
workflow_id=workflow_id,
211+
root_dir=root_dir,
212+
db_url=db_url,
213+
)
214+
execution_manager.process_workflow()
215+
216+
200217
class DefaultExecutionManager(ExecutionManager):
201218
"""Default execution manager that executes notebooks"""
202219

203220
def activate_workflow_definition(self):
204-
workflow_definition = self.model
221+
describe_workflow_definition: DescribeWorkflowDefinition = self.model
205222
with self.db_session() as session:
206223
session.query(WorkflowDefinition).filter(
207224
WorkflowDefinition.workflow_definition_id
208-
== workflow_definition.workflow_definition_id
225+
== describe_workflow_definition.workflow_definition_id
209226
).update({"active": True})
210227
session.commit()
211-
workflow_definition = (
212-
session.query(WorkflowDefinition)
213-
.filter(
214-
WorkflowDefinition.workflow_definition_id
215-
== workflow_definition.workflow_definition_id
216-
)
217-
.first()
218-
)
228+
self.serve_workflow_definition()
229+
230+
@flow
231+
def serve_workflow_definition(self):
232+
describe_workflow_definition: DescribeWorkflowDefinition = self.model
233+
attributes = describe_workflow_definition.dict(
234+
exclude={"schedule", "timezone"}, exclude_none=True
235+
)
236+
create_workflow = CreateWorkflow(**attributes)
237+
flow_path = Path(
238+
"/Users/aieroshe/Documents/jupyter-scheduler/jupyter_scheduler/executors.py"
239+
)
240+
create_and_run_workflow.from_source(
241+
source=str(flow_path.parent),
242+
entrypoint="executors.py:create_and_run_workflow",
243+
).serve(
244+
cron=self.model.schedule,
245+
parameters={
246+
"model": create_workflow,
247+
"root_dir": self.root_dir,
248+
"db_url": self.db_url,
249+
},
250+
)
219251

220252
@task(name="Execute workflow task")
221253
def execute_task(self, job: Job):

0 commit comments

Comments
 (0)