Skip to content

Commit 4e956c1

Browse files
committed
Merge branch 'release/0.0.4'
Signed-off-by: Pavel Kirilin <win10@list.ru>
2 parents 739f227 + c81f43d commit 4e956c1

File tree

5 files changed

+173
-11
lines changed

5 files changed

+173
-11
lines changed

poetry.lock

+50-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.0.3"
4-
description = "Asynchronous task queue with async support"
3+
version = "0.0.4"
4+
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <win10@list.ru>"]
66
maintainers = ["Pavel Kirilin <win10@list.ru>"]
77
readme = "README.md"
@@ -16,9 +16,10 @@ classifiers = [
1616
"Programming Language :: Python :: 3.8",
1717
"Programming Language :: Python :: 3.9",
1818
"Programming Language :: Python :: 3.10",
19-
"Topic :: Utilities",
20-
"Topic :: System :: Networking",
2119
"Operating System :: OS Independent",
20+
"Intended Audience :: Developers",
21+
"Topic :: System :: Networking",
22+
"Development Status :: 3 - Alpha",
2223
]
2324
homepage = "https://github.com/taskiq-python/taskiq"
2425
keywords = ["taskiq", "tasks", "distributed", "async"]
@@ -29,6 +30,8 @@ typing-extensions = ">=3.10.0.0"
2930
pydantic = "^1.6.2"
3031
pyzmq = { version = "^23.2.0", optional = true }
3132
uvloop = { version = "^0.16.0", optional = true }
33+
watchdog = "^2.1.9"
34+
gitignore-parser = "^0.0.8"
3235

3336
[tool.poetry.dev-dependencies]
3437
pytest = "^7.1.2"

taskiq/cli/args.py

+14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class TaskiqArgs:
2828
max_threadpool_threads: int
2929
no_parse: bool
3030
shutdown_timeout: float
31+
reload: bool
32+
no_gitignore: bool
3133

3234
@classmethod
3335
def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WPS213
@@ -113,6 +115,18 @@ def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WP
113115
default=5,
114116
help="Maximum amount of time for graceful broker's shutdown is seconds.",
115117
)
118+
parser.add_argument(
119+
"--reload",
120+
"-r",
121+
action="store_true",
122+
help="Reload workers if file is changed.",
123+
)
124+
parser.add_argument(
125+
"--do-not-use-gitignore",
126+
action="store_true",
127+
dest="no_gitignore",
128+
help="Do not use gitignore to check for updated files.",
129+
)
116130

117131
if args is None:
118132
namespace = parser.parse_args(args)

taskiq/cli/watcher.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from pathlib import Path
2+
from typing import Callable
3+
4+
from gitignore_parser import parse_gitignore
5+
from watchdog.events import FileSystemEvent
6+
7+
8+
class FileWatcher:
9+
"""Filewatcher that watchs for filesystem changes."""
10+
11+
def __init__(
12+
self,
13+
callback: Callable[[], None],
14+
use_gitignore: bool = True,
15+
) -> None:
16+
self.callback = callback
17+
self.gitignore = None
18+
gpath = Path("./.gitignore")
19+
if use_gitignore and gpath.exists():
20+
self.gitignore = parse_gitignore(gpath)
21+
22+
def dispatch(self, event: FileSystemEvent) -> None:
23+
"""
24+
React to event.
25+
26+
This function checks wether we need to
27+
react to event and calls callback if we do.
28+
29+
:param event: incoming fs event.
30+
"""
31+
if event.is_directory:
32+
return
33+
if event.event_type == "closed":
34+
return
35+
if ".pytest_cache" in event.src_path:
36+
return
37+
if "__pycache__" in event.src_path:
38+
return
39+
if self.gitignore and self.gitignore(event.src_path):
40+
return
41+
self.callback()

taskiq/cli/worker.py

+61-6
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77
from logging import basicConfig, getLevelName, getLogger
88
from multiprocessing import Process
99
from pathlib import Path
10+
from queue import Queue
1011
from time import sleep
1112
from typing import Any, Generator, List
1213

14+
from watchdog.observers import Observer
15+
1316
from taskiq.abc.broker import AsyncBroker
1417
from taskiq.cli.args import TaskiqArgs
1518
from taskiq.cli.async_task_runner import async_listen_messages
19+
from taskiq.cli.watcher import FileWatcher
1620

1721
try:
1822
import uvloop # noqa: WPS433
@@ -25,6 +29,8 @@
2529

2630
restart_workers = True
2731
worker_processes: List[Process] = []
32+
observer = Observer()
33+
reload_queue: "Queue[bool]" = Queue(-1)
2834

2935

3036
def signal_handler(_signal: int, _frame: Any) -> None:
@@ -45,9 +51,31 @@ def signal_handler(_signal: int, _frame: Any) -> None:
4551
# This is how we kill children,
4652
# by sending SIGINT to child processes.
4753
if process.pid is None:
48-
process.kill()
49-
else:
54+
continue
55+
try:
5056
os.kill(process.pid, signal.SIGINT)
57+
except ProcessLookupError:
58+
continue
59+
process.join()
60+
if observer.is_alive():
61+
observer.stop()
62+
observer.join()
63+
64+
65+
def schedule_workers_reload() -> None:
66+
"""
67+
Function to schedule workers to restart.
68+
69+
This function adds worker ids to the queue.
70+
71+
This queue is later read in watcher loop.
72+
"""
73+
global worker_processes # noqa: WPS420
74+
global reload_queue # noqa: WPS420
75+
76+
reload_queue.put(True)
77+
logger.info("Scheduled workers reload.")
78+
reload_queue.join()
5179

5280

5381
@contextmanager
@@ -212,13 +240,16 @@ def interrupt_handler(_signum: int, _frame: Any) -> None:
212240
loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout))
213241

214242

215-
def watch_workers_restarts(args: TaskiqArgs) -> None:
243+
def watcher_loop(args: TaskiqArgs) -> None: # noqa: C901, WPS213
216244
"""
217245
Infinate loop for main process.
218246
219247
This loop restarts worker processes
220248
if they exit with error returncodes.
221249
250+
Also it reads process ids from reload_queue
251+
and reloads workers if they were scheduled to reload.
252+
222253
:param args: cli arguements.
223254
"""
224255
global worker_processes # noqa: WPS420
@@ -228,6 +259,21 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
228259
# List of processes to remove.
229260
sleep(1)
230261
process_to_remove = []
262+
if not reload_queue.empty():
263+
while not reload_queue.empty():
264+
reload_queue.get()
265+
reload_queue.task_done()
266+
267+
for worker_id, worker in enumerate(worker_processes):
268+
worker.terminate()
269+
worker.join()
270+
worker_processes[worker_id] = Process(
271+
target=start_listen,
272+
kwargs={"args": args},
273+
name=f"worker-{worker_id}",
274+
)
275+
worker_processes[worker_id].start()
276+
231277
for worker_id, worker in enumerate(worker_processes):
232278
if worker.is_alive():
233279
continue
@@ -241,14 +287,13 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
241287
worker_processes[worker_id].start()
242288
else:
243289
logger.info("Worker-%s terminated.", worker_id)
244-
worker.join()
245290
process_to_remove.append(worker)
246291

247292
for dead_process in process_to_remove:
248293
worker_processes.remove(dead_process)
249294

250295

251-
def run_worker(args: TaskiqArgs) -> None:
296+
def run_worker(args: TaskiqArgs) -> None: # noqa: WPS213
252297
"""
253298
This function starts worker processes.
254299
@@ -279,7 +324,17 @@ def run_worker(args: TaskiqArgs) -> None:
279324
)
280325
worker_processes.append(work_proc)
281326

327+
if args.reload:
328+
observer.schedule(
329+
FileWatcher(
330+
callback=schedule_workers_reload,
331+
use_gitignore=not args.no_gitignore,
332+
),
333+
path=".",
334+
recursive=True,
335+
)
336+
observer.start()
282337
signal.signal(signal.SIGINT, signal_handler)
283338
signal.signal(signal.SIGTERM, signal_handler)
284339

285-
watch_workers_restarts(args=args)
340+
watcher_loop(args=args)

0 commit comments

Comments
 (0)