Skip to content

Commit a9534db

Browse files
committed
Introduce ProcessPool
1 parent b0a6f14 commit a9534db

File tree

3 files changed

+186
-68
lines changed

3 files changed

+186
-68
lines changed

fastcore/_modidx.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,19 @@
370370
'fastcore.net.urlsend': ('net.html#urlsend', 'fastcore/net.py'),
371371
'fastcore.net.urlvalid': ('net.html#urlvalid', 'fastcore/net.py'),
372372
'fastcore.net.urlwrap': ('net.html#urlwrap', 'fastcore/net.py')},
373-
'fastcore.parallel': { 'fastcore.parallel.ProcessPoolExecutor': ('parallel.html#processpoolexecutor', 'fastcore/parallel.py'),
373+
'fastcore.parallel': { 'fastcore.parallel.NoDaemonProcess': ('parallel.html#nodaemonprocess', 'fastcore/parallel.py'),
374+
'fastcore.parallel.NoDaemonProcess.daemon': ( 'parallel.html#nodaemonprocess.daemon',
375+
'fastcore/parallel.py'),
376+
'fastcore.parallel.ProcessPool': ('parallel.html#processpool', 'fastcore/parallel.py'),
377+
'fastcore.parallel.ProcessPool.__init__': ('parallel.html#processpool.__init__', 'fastcore/parallel.py'),
378+
'fastcore.parallel.ProcessPool.map': ('parallel.html#processpool.map', 'fastcore/parallel.py'),
379+
'fastcore.parallel.ProcessPoolExecutor': ('parallel.html#processpoolexecutor', 'fastcore/parallel.py'),
374380
'fastcore.parallel.ProcessPoolExecutor.__init__': ( 'parallel.html#processpoolexecutor.__init__',
375381
'fastcore/parallel.py'),
376382
'fastcore.parallel.ProcessPoolExecutor.map': ( 'parallel.html#processpoolexecutor.map',
377383
'fastcore/parallel.py'),
384+
'fastcore.parallel.ThreadPool': ('parallel.html#threadpool', 'fastcore/parallel.py'),
385+
'fastcore.parallel.ThreadPool.__init__': ('parallel.html#threadpool.__init__', 'fastcore/parallel.py'),
378386
'fastcore.parallel.ThreadPoolExecutor': ('parallel.html#threadpoolexecutor', 'fastcore/parallel.py'),
379387
'fastcore.parallel.ThreadPoolExecutor.__init__': ( 'parallel.html#threadpoolexecutor.__init__',
380388
'fastcore/parallel.py'),

fastcore/parallel.py

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/03a_parallel.ipynb.
22

33
# %% auto 0
4-
__all__ = ['threaded', 'startthread', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel', 'add_one',
5-
'run_procs', 'parallel_gen']
4+
__all__ = ['threaded', 'startthread', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'NoDaemonProcess',
5+
'ProcessPool', 'ThreadPool', 'parallel', 'add_one', 'run_procs', 'parallel_gen']
66

77
# %% ../nbs/03a_parallel.ipynb 1
88
from .imports import *
@@ -11,7 +11,7 @@
1111
from .meta import *
1212
from .xtras import *
1313
from functools import wraps
14-
14+
import multiprocessing.pool
1515
import concurrent.futures,time
1616
from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
1717
from threading import Thread
@@ -96,48 +96,100 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
9696
except Exception as e: self.on_exc(e)
9797

9898
# %% ../nbs/03a_parallel.ipynb 14
99+
class NoDaemonProcess(multiprocessing.Process):
100+
# See https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
101+
@property
102+
def daemon(self):
103+
return False
104+
@daemon.setter
105+
def daemon(self, value):
106+
pass
107+
108+
# %% ../nbs/03a_parallel.ipynb 15
109+
@delegates()
110+
class ProcessPool(multiprocessing.pool.Pool):
111+
"Same as Python's Pool, except can pass `max_workers==0` for serial execution"
112+
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, daemonic=False, **kwargs):
113+
if max_workers is None: max_workers=defaults.cpus
114+
store_attr()
115+
self.not_parallel = max_workers==0
116+
if self.not_parallel: max_workers=1
117+
if not daemonic:
118+
class NoDaemonContext(type(kwargs.get('context', get_context()))):
119+
Process = NoDaemonProcess
120+
kwargs['context'] = NoDaemonContext()
121+
super().__init__(max_workers, **kwargs)
122+
123+
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
124+
assert timeout is None, "timeout is not supported by ProcessPool, use ProcessPoolExecutor instead"
125+
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
126+
self.not_parallel = self.max_workers==0
127+
if self.not_parallel: self.max_workers=1
128+
129+
if self.not_parallel == False: self.lock = Manager().Lock()
130+
g = partial(f, *args, **kwargs)
131+
if self.not_parallel: return map(g, items)
132+
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
133+
try: return super().map(_g, items, chunksize=chunksize)
134+
except Exception as e: self.on_exc(e)
135+
136+
# %% ../nbs/03a_parallel.ipynb 16
137+
@delegates()
138+
class ThreadPool():
139+
# If you have a need for a ThreadPool, please open an issue.
140+
def __init__(self, *args, **kwargs):
141+
raise NotImplementedError("`ThreadPool` is not implemented")
142+
143+
# %% ../nbs/03a_parallel.ipynb 17
99144
try: from fastprogress import progress_bar
100145
except: progress_bar = None
101146

102-
# %% ../nbs/03a_parallel.ipynb 15
147+
# %% ../nbs/03a_parallel.ipynb 18
103148
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
104-
method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):
149+
method=None, threadpool=False, timeout=None, chunksize=1,
150+
executor=True, maxtasksperchild=None, **kwargs):
105151
"Applies `func` in parallel to `items`, using `n_workers`"
106152
kwpool = {}
107-
if threadpool: pool = ThreadPoolExecutor
153+
if threadpool: pool = ThreadPoolExecutor if executor else ThreadPool
108154
else:
155+
pool = ProcessPoolExecutor if executor else ProcessPool
109156
if not method and sys.platform == 'darwin': method='fork'
110-
if method: kwpool['mp_context'] = get_context(method)
111-
pool = ProcessPoolExecutor
157+
if method:
158+
if executor: kwpool['mp_context'] = get_context(method)
159+
else: kwpool['context'] = get_context(method)
160+
161+
if maxtasksperchild:
162+
assert pool==ProcessPool, "`maxtasksperchild` is only supported by ProcessPool"
163+
kwpool['maxtasksperchild'] = maxtasksperchild
112164
with pool(n_workers, pause=pause, **kwpool) as ex:
113165
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
114166
if progress and progress_bar:
115167
if total is None: total = len(items)
116168
r = progress_bar(r, total=total, leave=False)
117169
return L(r)
118170

119-
# %% ../nbs/03a_parallel.ipynb 16
171+
# %% ../nbs/03a_parallel.ipynb 19
120172
def add_one(x, a=1):
121173
# this import is necessary for multiprocessing in notebook on windows
122174
import random
123175
time.sleep(random.random()/80)
124176
return x+a
125177

126-
# %% ../nbs/03a_parallel.ipynb 22
178+
# %% ../nbs/03a_parallel.ipynb 25
127179
def run_procs(f, f_done, args):
128180
"Call `f` for each item in `args` in parallel, yielding `f_done`"
129181
processes = L(args).map(Process, args=arg0, target=f)
130182
for o in processes: o.start()
131183
yield from f_done()
132184
processes.map(Self.join())
133185

134-
# %% ../nbs/03a_parallel.ipynb 23
186+
# %% ../nbs/03a_parallel.ipynb 26
135187
def _f_pg(obj, queue, batch, start_idx):
136188
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))
137189

138190
def _done_pg(queue, items): return (queue.get() for _ in items)
139191

140-
# %% ../nbs/03a_parallel.ipynb 24
192+
# %% ../nbs/03a_parallel.ipynb 27
141193
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
142194
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
143195
if not parallelable('n_workers', n_workers): n_workers = 0

nbs/03a_parallel.ipynb

Lines changed: 113 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"from fastcore.meta import *\n",
2323
"from fastcore.xtras import *\n",
2424
"from functools import wraps\n",
25-
"\n",
25+
"import multiprocessing.pool\n",
2626
"import concurrent.futures,time\n",
2727
"from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context\n",
2828
"from threading import Thread\n",
@@ -207,15 +207,26 @@
207207
"text/markdown": [
208208
"---\n",
209209
"\n",
210-
"### ThreadPoolExecutor\n",
210+
"[source](https://github.com/fastai/fastcore/blob/master/fastcore/parallel.py#L58){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
211+
"\n",
212+
"#### ThreadPoolExecutor\n",
211213
"\n",
212-
"> ThreadPoolExecutor (max_workers=8, on_exc=<built-infunctionprint>,\n",
214+
"> ThreadPoolExecutor (max_workers=6, on_exc=<built-in function print>,\n",
213215
"> pause=0, **kwargs)\n",
214216
"\n",
215217
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
216218
],
217219
"text/plain": [
218-
"<nbdev.showdoc.BasicMarkdownRenderer>"
220+
"---\n",
221+
"\n",
222+
"[source](https://github.com/fastai/fastcore/blob/master/fastcore/parallel.py#L58){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
223+
"\n",
224+
"#### ThreadPoolExecutor\n",
225+
"\n",
226+
"> ThreadPoolExecutor (max_workers=6, on_exc=<built-in function print>,\n",
227+
"> pause=0, **kwargs)\n",
228+
"\n",
229+
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
219230
]
220231
},
221232
"execution_count": null,
@@ -267,16 +278,28 @@
267278
"text/markdown": [
268279
"---\n",
269280
"\n",
270-
"### ProcessPoolExecutor\n",
281+
"[source](https://github.com/fastai/fastcore/blob/master/fastcore/parallel.py#L77){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
271282
"\n",
272-
"> ProcessPoolExecutor (max_workers=8, on_exc=<built-infunctionprint>,\n",
283+
"#### ProcessPoolExecutor\n",
284+
"\n",
285+
"> ProcessPoolExecutor (max_workers=6, on_exc=<built-in function print>,\n",
273286
"> pause=0, mp_context=None, initializer=None,\n",
274287
"> initargs=())\n",
275288
"\n",
276289
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
277290
],
278291
"text/plain": [
279-
"<nbdev.showdoc.BasicMarkdownRenderer>"
292+
"---\n",
293+
"\n",
294+
"[source](https://github.com/fastai/fastcore/blob/master/fastcore/parallel.py#L77){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
295+
"\n",
296+
"#### ProcessPoolExecutor\n",
297+
"\n",
298+
"> ProcessPoolExecutor (max_workers=6, on_exc=<built-in function print>,\n",
299+
"> pause=0, mp_context=None, initializer=None,\n",
300+
"> initargs=())\n",
301+
"\n",
302+
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
280303
]
281304
},
282305
"execution_count": null,
@@ -288,6 +311,72 @@
288311
"show_doc(ProcessPoolExecutor, title_level=4)"
289312
]
290313
},
314+
{
315+
"cell_type": "code",
316+
"execution_count": null,
317+
"metadata": {},
318+
"outputs": [],
319+
"source": [
320+
"#|export\n",
321+
"class NoDaemonProcess(multiprocessing.Process):\n",
322+
" # See https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic\n",
323+
" @property\n",
324+
" def daemon(self):\n",
325+
" return False\n",
326+
" @daemon.setter\n",
327+
" def daemon(self, value):\n",
328+
" pass"
329+
]
330+
},
331+
{
332+
"cell_type": "code",
333+
"execution_count": null,
334+
"metadata": {},
335+
"outputs": [],
336+
"source": [
337+
"#|export\n",
338+
"@delegates()\n",
339+
"class ProcessPool(multiprocessing.pool.Pool):\n",
340+
" \"Same as Python's Pool, except can pass `max_workers==0` for serial execution\"\n",
341+
" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, daemonic=False, **kwargs):\n",
342+
" if max_workers is None: max_workers=defaults.cpus\n",
343+
" store_attr()\n",
344+
" self.not_parallel = max_workers==0\n",
345+
" if self.not_parallel: max_workers=1\n",
346+
" if not daemonic:\n",
347+
" class NoDaemonContext(type(kwargs.get('context', get_context()))):\n",
348+
" Process = NoDaemonProcess\n",
349+
" kwargs['context'] = NoDaemonContext()\n",
350+
" super().__init__(max_workers, **kwargs)\n",
351+
"\n",
352+
" def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):\n",
353+
" assert timeout is None, \"timeout is not supported by ProcessPool, use ProcessPoolExecutor instead\"\n",
354+
" if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0\n",
355+
" self.not_parallel = self.max_workers==0\n",
356+
" if self.not_parallel: self.max_workers=1\n",
357+
"\n",
358+
" if self.not_parallel == False: self.lock = Manager().Lock()\n",
359+
" g = partial(f, *args, **kwargs)\n",
360+
" if self.not_parallel: return map(g, items)\n",
361+
" _g = partial(_call, self.lock, self.pause, self.max_workers, g)\n",
362+
" try: return super().map(_g, items, chunksize=chunksize)\n",
363+
" except Exception as e: self.on_exc(e)"
364+
]
365+
},
366+
{
367+
"cell_type": "code",
368+
"execution_count": null,
369+
"metadata": {},
370+
"outputs": [],
371+
"source": [
372+
"# |export\n",
373+
"@delegates()\n",
374+
"class ThreadPool():\n",
375+
" # If you have a need for a ThreadPool, please open an issue.\n",
376+
" def __init__(self, *args, **kwargs):\n",
377+
" raise NotImplementedError(\"`ThreadPool` is not implemented\")"
378+
]
379+
},
291380
{
292381
"cell_type": "code",
293382
"execution_count": null,
@@ -307,14 +396,21 @@
307396
"source": [
308397
"#|export\n",
309398
"def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,\n",
310-
" method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):\n",
399+
" method=None, threadpool=False, timeout=None, chunksize=1,\n",
400+
" executor=True, maxtasksperchild=None, **kwargs):\n",
311401
" \"Applies `func` in parallel to `items`, using `n_workers`\"\n",
312402
" kwpool = {}\n",
313-
" if threadpool: pool = ThreadPoolExecutor\n",
403+
" if threadpool: pool = ThreadPoolExecutor if executor else ThreadPool\n",
314404
" else:\n",
405+
" pool = ProcessPoolExecutor if executor else ProcessPool\n",
315406
" if not method and sys.platform == 'darwin': method='fork'\n",
316-
" if method: kwpool['mp_context'] = get_context(method)\n",
317-
" pool = ProcessPoolExecutor\n",
407+
" if method:\n",
408+
" if executor: kwpool['mp_context'] = get_context(method)\n",
409+
" else: kwpool['context'] = get_context(method)\n",
410+
"\n",
411+
" if maxtasksperchild:\n",
412+
" assert pool==ProcessPool, \"`maxtasksperchild` is only supported by ProcessPool\"\n",
413+
" kwpool['maxtasksperchild'] = maxtasksperchild\n",
318414
" with pool(n_workers, pause=pause, **kwpool) as ex:\n",
319415
" r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)\n",
320416
" if progress and progress_bar:\n",
@@ -377,11 +473,11 @@
377473
"name": "stdout",
378474
"output_type": "stream",
379475
"text": [
380-
"0 2022-08-07 05:10:05.999916\n",
381-
"1 2022-08-07 05:10:06.252031\n",
382-
"2 2022-08-07 05:10:06.503603\n",
383-
"3 2022-08-07 05:10:06.755216\n",
384-
"4 2022-08-07 05:10:07.006702\n"
476+
"0 2023-02-14 20:40:39.098928\n",
477+
"1 2023-02-14 20:40:39.350350\n",
478+
"2 2023-02-14 20:40:39.601602\n",
479+
"3 2023-02-14 20:40:39.851952\n",
480+
"4 2023-02-14 20:40:40.102687\n"
385481
]
386482
}
387483
],
@@ -499,45 +595,7 @@
499595
"cell_type": "code",
500596
"execution_count": null,
501597
"metadata": {},
502-
"outputs": [
503-
{
504-
"data": {
505-
"text/html": [
506-
"\n",
507-
"<style>\n",
508-
" /* Turns off some styling */\n",
509-
" progress {\n",
510-
" /* gets rid of default border in Firefox and Opera. */\n",
511-
" border: none;\n",
512-
" /* Needs to be in here for Safari polyfill so background images work as expected. */\n",
513-
" background-size: auto;\n",
514-
" }\n",
515-
" progress:not([value]), progress:not([value])::-webkit-progress-bar {\n",
516-
" background: repeating-linear-gradient(45deg, #7e7e7e, #7e7e7e 10px, #5c5c5c 10px, #5c5c5c 20px);\n",
517-
" }\n",
518-
" .progress-bar-interrupted, .progress-bar-interrupted::-webkit-progress-bar {\n",
519-
" background: #F44336;\n",
520-
" }\n",
521-
"</style>\n"
522-
],
523-
"text/plain": [
524-
"<IPython.core.display.HTML object>"
525-
]
526-
},
527-
"metadata": {},
528-
"output_type": "display_data"
529-
},
530-
{
531-
"data": {
532-
"text/html": [],
533-
"text/plain": [
534-
"<IPython.core.display.HTML object>"
535-
]
536-
},
537-
"metadata": {},
538-
"output_type": "display_data"
539-
}
540-
],
598+
"outputs": [],
541599
"source": [
542600
"class TestSleepyBatchFunc:\n",
543601
" \"For testing parallel processes that run at different speeds\"\n",

0 commit comments

Comments
 (0)