Skip to content

Commit 24f4830

Browse files
authored
Powerful exec command (#53)
* More docs * command line params * exec command
1 parent 6c3b0d4 commit 24f4830

File tree

17 files changed

+508
-255
lines changed

17 files changed

+508
-255
lines changed

.vscode/launch.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,22 @@
3333
"RedirectOutput"
3434
]
3535
},
36+
{
37+
"name": "AIO-FLUID EXAMPLE exec",
38+
"type": "python",
39+
"request": "launch",
40+
"program": "${workspaceFolder}/examples",
41+
"cwd": "${workspaceFolder}",
42+
"justMyCode": false,
43+
"args": [
44+
"exec",
45+
"add",
46+
"--a",
47+
"1",
48+
],
49+
"debugOptions": [
50+
"RedirectOutput"
51+
]
52+
},
3653
]
3754
}

docs/reference/task_cli.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Task Manager Cli
2+
3+
Command line tools for task manager applications.
4+
5+
This modules requires the `cli` extra to be installed.
6+
7+
```bash
8+
$ pip install aio-fluid[cli]
9+
```
10+
It can be imported from `fluid.scheduler.cli`:
11+
12+
```python
13+
from fastapi.scheduler.cli import TaskManagerCLI
14+
15+
if __name__ == "__main__":
16+
cli = TaskManagerCLI("path.to:task_app")
17+
cli()
18+
```
19+
20+
::: fluid.scheduler.cli.TaskManagerCLI

docs/reference/task_manager.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ from fastapi.scheduler import TaskManager
88

99
::: fluid.scheduler.TaskManager
1010

11-
1211
::: fluid.scheduler.TaskManagerConfig
1312

1413
::: fluid.scheduler.consumer.TaskDispatcher

docs/reference/task_scheduler.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Task Scheduler
2+
3+
The task scheduler [TaskScheduler][fluid.scheduler.TaskScheduler] inherits
4+
from the [TaskConsumer][fluid.scheduler.TaskConsumer] to add scheduling of
5+
periodic tasks.
6+
7+
It can be imported from `fluid.scheduler`:
8+
9+
```python
10+
from fastapi.scheduler import TaskScheduler
11+
```
12+
13+
::: fluid.scheduler.TaskScheduler

docs/reference/utils.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Utils
2+
3+
::: fluid.utils.lazy.LazyGroup

docs/tutorials/task_app.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Task Queue App
2+
3+
The `fluid.scheduler` module is a simple yet powerful distributed task producer ([TaskScheduler][fluid.scheduler.TaskScheduler]) and consumer ([TaskConsumer][fluid.scheduler.TaskConsumer]) system for executing tasks.
4+
The middleware for distributing tasks can be configured via the [TaskBroker][fluid.scheduler.TaskBroker] interface.
5+
6+
A redis task broker is provided for convenience.
7+
8+
## Tasks Consumer
9+
10+
Create a task consumer, register tasks from modules, and run the consumer.
11+
12+
```python
13+
import asyncio
14+
from typing import Any
15+
from fluid.scheduler import TaskConsumer
16+
import task_module_a, task_module_b
17+
18+
19+
def task_consumer(**kwargs: Any) -> TaskConsumer:
20+
consumer = TaskConsumer(**kwargs)
21+
consumer.register_from_module(task_module_a)
22+
consumer.register_from_module(task_module_b)
23+
return consumer
24+
25+
26+
if __name__ == "__main__":
27+
consumer = task_consumer()
28+
asyncio.run(consumer.run())
29+
```
30+
31+
## FastAPI Integration
32+
33+
The `TaskConsumer` can be integrated with FastAPI so that
34+
tasks can be queued via HTTP requests.
35+
36+
```python
37+
import uvicorn
38+
from fluid.scheduler.endpoints import setup_fastapi
39+
40+
if __name__ == "__main__":
41+
consumer = task_consumer()
42+
app = setup_fastapi(consumer)
43+
uvicorn.run(app)
44+
```
45+
46+
You can test via the example provided
47+
48+
```bash
49+
$ python -m examples.simple_fastapi
50+
```
51+
52+
and check the openapi UI at [http://127.0.0.1:8000/docs](http://127.0.0.1:8000/docs).
53+
54+
55+
## Task App Command Line
56+
57+
The [TaskConsumer][fluid.scheduler.TaskConsumer] or [TaskScheduler][fluid.scheduler.TaskScheduler] can be run with the command line tool to allow for an even richer API.
58+
59+
```python
60+
from fluid.scheduler.cli import TaskManagerCLI
61+
62+
if __name__ == "__main__":
63+
consumer = task_consumer()
64+
TaskManagerCLI(setup_fastapi(consumer))()
65+
```
66+
67+
This features requires to install the package with the `cli` extra.
68+
69+
```bash
70+
$ pip install aio-fluid[cli]
71+
```
72+
73+
```bash
74+
$ python -m examples.simple_cli
75+
Usage: python -m examples.simple_cli [OPTIONS] COMMAND [ARGS]...
76+
77+
Options:
78+
--help Show this message and exit.
79+
80+
Commands:
81+
exec Execute a registered task
82+
ls List all tasks with their schedules
83+
serve Start app server
84+
```
85+
86+
The command line tools provides a powerful interface to execute tasks, parameters are
87+
passed as optional arguments using he standard click interface.

docs/tutorials/task_queue.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
# Task Queue
2-
3-
The `fluid.scheduler` module is a simple yet powerful distributed task producer (TaskScheduler) and consumer (TaskConsumer) system for executing tasks.
4-
The middleware for distributing tasks can be configured via the Broker interface.
5-
A redis broker is provided for convenience.
6-
7-
## Tasks
1+
# Tasks
82

93
Tasks are standard python async functions decorated with the [@task][fluid.scheduler.task] decorator.
104

@@ -42,7 +36,7 @@ async def say_hi(ctx: TaskRun[TaskParams]) -> None:
4236

4337
## Task Types
4438

45-
There are two types of tasks implemented
39+
There are few types of tasks implemented, lets take a look at them.
4640

4741
### IO Bound Tasks
4842

@@ -89,10 +83,22 @@ Both IO and CPU bound tasks can be periodically scheduled via the `schedule` key
8983
There are two types of scheduling, the most common is the [every][fluid.scheduler.every] function that takes a `timedelta` object.
9084

9185
```python
86+
import asyncio
9287
from datetime import timedelta
93-
from fluid.scheduler import task, TaskContext, every
88+
from fluid.scheduler import task, TaskRun, every
9489

9590
@task(schedule=every(timedelta(seconds=1)))
9691
async def scheduled(ctx: TaskRun) -> None:
9792
await asyncio.sleep(0.1)
9893
```
94+
95+
You can also use the [crontab][fluid.scheduler.crontab] function to schedule tasks using cron expressions.
96+
97+
```python
98+
import asyncio
99+
from fluid.scheduler import task, TaskRun, crontab
100+
101+
@task(schedule=crontab(hours='*/2'))
102+
async def scheduled(ctx: TaskRun) -> None:
103+
await asyncio.sleep(0.1)
104+
```

examples/simple.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import asyncio
2+
3+
from examples.tasks import task_scheduler
4+
from fluid.utils import log
5+
6+
if __name__ == "__main__":
7+
log.config()
8+
asyncio.run(task_scheduler().run())

examples/simple_cli.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import dotenv
2+
3+
dotenv.load_dotenv()
4+
5+
from fluid.scheduler.cli import TaskManagerCLI # isort:skip # noqa: E402
6+
7+
8+
if __name__ == "__main__":
9+
from examples.tasks import task_app
10+
11+
task_manager_cli = TaskManagerCLI(task_app())
12+
task_manager_cli()

examples/simple_fastapi.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import uvicorn
2+
3+
from examples.tasks import task_scheduler
4+
from fluid.scheduler.endpoints import setup_fastapi
5+
from fluid.utils import log
6+
7+
if __name__ == "__main__":
8+
log.config()
9+
app = setup_fastapi(task_scheduler())
10+
uvicorn.run(app)

examples/tasks/__init__.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
from fluid.scheduler.endpoints import setup_fastapi
1313

1414

15-
def task_app() -> FastAPI:
15+
def task_scheduler() -> TaskScheduler:
1616
task_manager = TaskScheduler()
1717
task_manager.register_from_dict(globals())
18-
return setup_fastapi(task_manager)
18+
return task_manager
19+
20+
21+
def task_app() -> FastAPI:
22+
return setup_fastapi(task_scheduler())
1923

2024

2125
class Sleep(BaseModel):
@@ -38,8 +42,8 @@ async def scheduled(context: TaskRun) -> None:
3842

3943

4044
class AddValues(BaseModel):
41-
a: float = 0
42-
b: float = 0
45+
a: float = Field(default=0, description="First number to add")
46+
b: float = Field(default=0, description="Second number to add")
4347

4448

4549
@task

fluid/scheduler/cli.py

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import click
66
import uvicorn
77
from fastapi import FastAPI
8+
from pydanclick import from_pydantic
9+
from pydantic import BaseModel
810
from rich.console import Console
911
from rich.table import Table
1012
from uvicorn.importer import import_from_string
1113

12-
from fluid.utils import log
14+
from fluid.utils import log as log_
1315
from fluid.utils.lazy import LazyGroup
1416

1517
if TYPE_CHECKING:
@@ -21,6 +23,13 @@
2123

2224

2325
class TaskManagerCLI(LazyGroup):
26+
"""CLI for TaskManager
27+
28+
This class provides a CLI for a TaskManager Application.
29+
30+
It requires to install the `cli` extra dependencies.
31+
"""
32+
2433
def __init__(
2534
self,
2635
task_manager_app: TaskManagerApp,
@@ -49,10 +58,38 @@ def ctx_task_manager(ctx: click.Context) -> TaskManager:
4958
return ctx_app(ctx).state.task_manager
5059

5160

61+
class ExecuteTasks(click.Group):
62+
def list_commands(self, ctx: click.Context) -> list[str]:
63+
task_manager = ctx_task_manager(ctx)
64+
return sorted(task_manager.registry)
65+
66+
def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
67+
task_manager = ctx_task_manager(ctx)
68+
task = task_manager.registry.get(cmd_name)
69+
if task is None:
70+
raise click.ClickException(f"Task {cmd_name} not found")
71+
72+
@click.command(cmd_name, help=task.short_description)
73+
@click.option("--log", is_flag=True, help="Show logs")
74+
@from_pydantic(task.params_model)
75+
def execute_task(log: bool, **kwargs: Any) -> None:
76+
if log:
77+
log_.config()
78+
params = {}
79+
for value in kwargs.values():
80+
if isinstance(value, BaseModel):
81+
params.update(value.model_dump())
82+
run = task_manager.execute_sync(cmd_name, **params)
83+
console = Console()
84+
console.print(task_run_table(run))
85+
86+
return execute_task
87+
88+
5289
@click.command()
5390
@click.pass_context
5491
def ls(ctx: click.Context) -> None:
55-
"""list all tasks"""
92+
"""List all tasks with their schedules"""
5693
task_manager = ctx_task_manager(ctx)
5794
table = Table(title="Tasks")
5895
table.add_column("Name", style="cyan", no_wrap=True)
@@ -75,23 +112,6 @@ def ls(ctx: click.Context) -> None:
75112
console.print(table)
76113

77114

78-
@click.command()
79-
@click.pass_context
80-
@click.argument("task")
81-
@click.option(
82-
"--dry-run",
83-
is_flag=True,
84-
help="dry run (if the tasks supports it)",
85-
default=False,
86-
)
87-
def execute(ctx: click.Context, task: str, dry_run: bool) -> None:
88-
"""execute a task"""
89-
task_manager = ctx_task_manager(ctx)
90-
run = task_manager.execute_sync(task, dry_run=dry_run)
91-
console = Console()
92-
console.print(task_run_table(run))
93-
94-
95115
@click.command("serve", short_help="Start app server.")
96116
@click.option(
97117
"--host",
@@ -116,18 +136,20 @@ def execute(ctx: click.Context, task: str, dry_run: bool) -> None:
116136
)
117137
@click.pass_context
118138
def serve(ctx: click.Context, host: str, port: int, reload: bool) -> None:
119-
"""Run the service."""
139+
"""Run the service"""
120140
task_manager_app = ctx_task_manager_app(ctx)
121141
uvicorn.run(
122142
task_manager_app,
123143
port=port,
124144
host=host,
125145
log_level="info",
126146
reload=reload,
127-
log_config=log.config(),
147+
log_config=log_.config(),
128148
)
129149

130150

151+
execute = ExecuteTasks(name="exec", help="Execute a registered task")
152+
131153
DEFAULT_COMMANDS = (ls, execute, serve)
132154

133155

fluid/utils/lazy.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@
55

66

77
class LazyGroup(click.Group):
8+
"""A click Group that can lazily load subcommands
9+
10+
This class extends the click.Group class to allow for subcommands to be
11+
lazily loaded from a module path.
12+
13+
It is useful when you have a large number of subcommands that you don't
14+
want to load until they are actually needed.
15+
16+
Available with the `cli` extra dependencies.
17+
"""
18+
819
def __init__(
920
self,
1021
*,

0 commit comments

Comments
 (0)