Skip to content

Commit 5444720

Browse files
committed
respect new dask.config global selection of scheduler
- fix #48 - updated boiler-plate code in ParallelAnalysisBase.run and copied and pasted into leaflet.LeafletFinder.run() (TODO: makes this more DRY) - dask.distributed added as dependency (it is recommended by dask for a single node anyway, and it avoids imports inside if statements... much cleaner code in PMDA) - removed scheduler kwarg: use dask.config.set(scheduler=...) - 'multiprocessing' and n_jobs=-1 are now only selected if nothing is set by dask; if one wants n_jobs=-1 to always grab all cores then you must set the multiprocessing scheduler - default for n_jobs=1 (instead of -1), i.e., the single threaded scheduler - updated tests - removed unnecessary broken(?) test for "no deprecations" in parallel.ParallelAnalysisBase - updated CHANGELOG
1 parent 6f54546 commit 5444720

8 files changed

+38
-49
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ env:
2020
# minimal CONDA_MDANALYSIS_DEPENDENCIES
2121
#- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx"
2222
- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis"
23-
- CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask joblib pytest-pep8 mock codecov"
23+
- CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov"
2424
- CONDA_CHANNELS='conda-forge'
2525
- CONDA_CHANNEL_PRIORITY=True
2626
# install development version of MDAnalysis (needed until the test

CHANGELOG

+10-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ The rules for this file:
1313
* release numbers follow "Semantic Versioning" http://semver.org
1414

1515
------------------------------------------------------------------------------
16-
xx/xx/18 VOD555, richardjgowers, iparask, orbeckst
16+
11/xx/18 VOD555, richardjgowers, iparask, orbeckst, kain88-de
1717

1818
* 0.2.0
1919

@@ -27,6 +27,15 @@ Fixes
2727
* always distribute frames over blocks so that no empty blocks are
2828
created ("balanced blocks", Issue #71)
2929

30+
Changes
31+
* requires dask >= 0.18.0 and respects/requires globally setting of the dask
32+
scheduler (Issue #48)
33+
* removed the 'scheduler' keyword from the run() method; use
34+
dask.config.set(scheduler=...) as recommended in the dask docs
35+
* uses single-threaaded scheduler if n_jobs=1 (Issue #17)
36+
* n_jobs=1 is now the default for run() (used to be n_jobs=-1)
37+
* dask.distributed is now a dependency
38+
3039

3140
06/07/18 orbeckst
3241

conftest.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ def client(tmpdir_factory, request):
2525
lc.close()
2626

2727

28-
@pytest.fixture(scope='session', params=('distributed', 'multiprocessing', 'single-threaded'))
28+
@pytest.fixture(scope='session', params=('distributed',
29+
'multiprocessing',
30+
'single-threaded'))
2931
def scheduler(request, client):
3032
if request.param == 'distributed':
3133
arg = client

pmda/leaflet.py

+7-18
Original file line numberDiff line numberDiff line change
@@ -249,19 +249,15 @@ def run(self,
249249
used
250250
251251
"""
252-
# are we using a distributed scheduler or should we use multiprocessing?
252+
# are we using a distributed scheduler or should we use
253+
# multiprocessing?
253254
scheduler = dask.config.get('scheduler', None)
254-
if scheduler is None and client is None:
255-
scheduler = 'multiprocessing'
256-
elif scheduler is None:
255+
if scheduler is None:
257256
# maybe we can grab a global worker
258257
try:
259-
from dask import distributed
260-
scheduler = distributed.worker.get_client()
258+
scheduler = dask.distributed.worker.get_client()
261259
except ValueError:
262260
pass
263-
except ImportError:
264-
pass
265261

266262
if n_jobs == -1:
267263
n_jobs = cpu_count()
@@ -272,16 +268,9 @@ def run(self,
272268
if scheduler is None and n_jobs == 1:
273269
scheduler = 'single-threaded'
274270

275-
if n_blocks is None:
276-
if scheduler == 'multiprocessing':
277-
n_blocks = n_jobs
278-
elif isinstance(scheduler, distributed.Client):
279-
n_blocks = len(scheduler.ncores())
280-
else:
281-
n_blocks = 1
282-
warnings.warn(
283-
"Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1"
284-
"Please provide `n_blocks` in call to method.")
271+
# fall back to multiprocessing, we tried everything
272+
if scheduler is None:
273+
scheduler = 'multiprocessing'
285274

286275
scheduler_kwargs = {'scheduler': scheduler}
287276
if scheduler == 'multiprocessing':

pmda/parallel.py

+12-10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import MDAnalysis as mda
2424
from dask.delayed import delayed
2525
import dask
26+
import dask.distributed
2627
from joblib import cpu_count
2728
import numpy as np
2829

@@ -288,19 +289,15 @@ def run(self,
288289
to n_jobs or number of available workers in scheduler.
289290
290291
"""
291-
# are we using a distributed scheduler or should we use multiprocessing?
292+
# are we using a distributed scheduler or should we use
293+
# multiprocessing?
292294
scheduler = dask.config.get('scheduler', None)
293-
if scheduler is None and client is None:
294-
scheduler = 'multiprocessing'
295-
elif scheduler is None:
295+
if scheduler is None:
296296
# maybe we can grab a global worker
297297
try:
298-
from dask import distributed
299-
scheduler = distributed.worker.get_client()
298+
scheduler = dask.distributed.worker.get_client()
300299
except ValueError:
301300
pass
302-
except ImportError:
303-
pass
304301

305302
if n_jobs == -1:
306303
n_jobs = cpu_count()
@@ -311,15 +308,20 @@ def run(self,
311308
if scheduler is None and n_jobs == 1:
312309
scheduler = 'single-threaded'
313310

311+
# fall back to multiprocessing, we tried everything
312+
if scheduler is None:
313+
scheduler = 'multiprocessing'
314+
314315
if n_blocks is None:
315316
if scheduler == 'multiprocessing':
316317
n_blocks = n_jobs
317-
elif isinstance(scheduler, distributed.Client):
318+
elif isinstance(scheduler, dask.distributed.Client):
318319
n_blocks = len(scheduler.ncores())
319320
else:
320321
n_blocks = 1
321322
warnings.warn(
322-
"Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1"
323+
"Couldn't guess ideal number of blocks from scheduler. "
324+
"Setting n_blocks=1. "
323325
"Please provide `n_blocks` in call to method.")
324326

325327
scheduler_kwargs = {'scheduler': scheduler}

pmda/test/test_custom.py

-15
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import numpy as np
1313
import MDAnalysis as mda
1414
from MDAnalysisTests.datafiles import PSF, DCD
15-
from MDAnalysisTests.util import no_deprecated_call
1615
import pytest
1716
from numpy.testing import assert_equal
1817

@@ -82,17 +81,3 @@ def test_analysis_class():
8281
with pytest.raises(ValueError):
8382
ana_class(2)
8483

85-
86-
def test_analysis_class_decorator():
87-
# Issue #1511
88-
# analysis_class should not raise
89-
# a DeprecationWarning
90-
u = mda.Universe(PSF, DCD)
91-
92-
def distance(a, b):
93-
return np.linalg.norm((a.centroid() - b.centroid()))
94-
95-
Distances = custom.analysis_class(distance)
96-
97-
with no_deprecated_call():
98-
Distances(u, u.atoms[:10], u.atoms[10:20]).run()

pmda/test/test_parallel.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from MDAnalysisTests.datafiles import DCD, PSF
1515
import joblib
1616

17-
from dask import distributed
17+
import dask
1818

1919
from pmda import parallel
2020

@@ -105,7 +105,8 @@ def test_nblocks(analysis, n_blocks):
105105

106106

107107
def test_guess_nblocks(analysis):
108-
analysis.run(n_jobs=-1)
108+
with dask.config.set(scheduler='multiprocessing'):
109+
analysis.run(n_jobs=-1)
109110
assert len(analysis._results) == joblib.cpu_count()
110111

111112

setup.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@
5050
install_requires=[
5151
'MDAnalysis>=0.18',
5252
'dask>=0.18',
53+
'distributed',
5354
'six',
5455
'joblib', # cpu_count func currently
5556
'networkx',
56-
'scipy',
57+
'scipy',
5758
],
5859
tests_require=[
5960
'pytest',

0 commit comments

Comments
 (0)