diff --git a/pfb/__init__.py b/pfb/__init__.py index a30e57e5..874aee26 100644 --- a/pfb/__init__.py +++ b/pfb/__init__.py @@ -55,8 +55,9 @@ def set_client(nworkers, stack, log, memory_limit=0, # str(mem_limit/nworkers)+'GB' asynchronous=False) cluster = stack.enter_context(cluster) - client = stack.enter_context(Client(cluster, - direct_to_workers=direct_to_workers)) + client = Client(cluster, + direct_to_workers=direct_to_workers) + client = stack.enter_context(client) client.wait_for_workers(nworkers) dashboard_url = client.dashboard_link diff --git a/pfb/workers/degrid.py b/pfb/workers/degrid.py index c0941068..bae20e1f 100644 --- a/pfb/workers/degrid.py +++ b/pfb/workers/degrid.py @@ -81,7 +81,7 @@ def degrid(**kw): ti = time.time() _degrid(**opts) - print(f"All done after {time.time() - ti}s.", file=log) + print(f"All done after {time.time() - ti}s.", file=log) def _degrid(**kw): opts = OmegaConf.create(kw) diff --git a/pfb/workers/fluxmop.py b/pfb/workers/fluxmop.py index d9ede313..469370a7 100644 --- a/pfb/workers/fluxmop.py +++ b/pfb/workers/fluxmop.py @@ -147,7 +147,7 @@ def fluxmop(**kw): column = fut.result() print(f'Done writing {column}', file=log) - print(f"All done after {time.time() - ti}s", file=log) + print(f"All done after {time.time() - ti}s", file=log) def _fluxmop(**kw): opts = OmegaConf.create(kw) diff --git a/pfb/workers/grid.py b/pfb/workers/grid.py index 49288ec8..3981d5b0 100644 --- a/pfb/workers/grid.py +++ b/pfb/workers/grid.py @@ -173,7 +173,7 @@ def grid(**kw): continue print(f'Done writing {column}', file=log) - print(f"All done after {time.time() - ti}s", file=log) + print(f"All done after {time.time() - ti}s", file=log) def _grid(xdsi=None, **kw): opts = OmegaConf.create(kw) diff --git a/pfb/workers/hci.py b/pfb/workers/hci.py index 485efa22..8a41a371 100644 --- a/pfb/workers/hci.py +++ b/pfb/workers/hci.py @@ -92,7 +92,7 @@ def hci(**kw): ti = time.time() _hci(**opts) - print(f"All done after {time.time() - ti}s", file=log) + print(f"All done after {time.time() - ti}s", file=log) def _hci(**kw): opts = OmegaConf.create(kw) @@ -336,6 +336,9 @@ def _hci(**kw): while idle_workers: # Seed each worker with a task. + if n_launched == nds: # Stop once all jobs have been launched. + break + (subds, jones, freqsi, utimesi, ridx, rcnts, radeci, fi, ti, ms) = datasets[n_launched] @@ -373,7 +376,7 @@ def _hci(**kw): for completed_future in ac_iter: if n_launched == nds: # Stop once all jobs have been launched. - continue + break (subds, jones, freqsi, utimesi, ridx, rcnts, radeci, fi, ti, ms) = datasets[n_launched] diff --git a/pfb/workers/init.py b/pfb/workers/init.py index 45ebb614..e78b9fbc 100644 --- a/pfb/workers/init.py +++ b/pfb/workers/init.py @@ -91,7 +91,7 @@ def init(**kw): ti = time.time() _init(**opts) - print(f"All done after {time.time() - ti}s", file=log) + print(f"All done after {time.time() - ti}s", file=log) def _init(**kw): opts = OmegaConf.create(kw) @@ -282,6 +282,9 @@ def _init(**kw): nds = len(datasets) while idle_workers: # Seed each worker with a task. + if n_launched == nds: # Stop once all jobs have been launched. + break + (subds, jones, freqsi, chan_widthi, utimesi, ridx, rcnts, radeci, fi, ti, ims, ms) = datasets[n_launched] @@ -321,7 +324,7 @@ def _init(**kw): for completed_future in ac_iter: if n_launched == nds: # Stop once all jobs have been launched. - continue + break if isinstance(completed_future.result(), BaseException): print(completed_future.result()) diff --git a/pfb/workers/klean.py b/pfb/workers/klean.py index dd2f9c8a..28b734b0 100644 --- a/pfb/workers/klean.py +++ b/pfb/workers/klean.py @@ -95,7 +95,7 @@ def klean(**kw): f'{fits_oname}_{opts.suffix}', norm_wsum=False) - print(f"All done after {time.time() - ti}s", file=log) + print(f"All done after {time.time() - ti}s", file=log) def _klean(**kw): diff --git a/pfb/workers/sara.py b/pfb/workers/sara.py index d5f97ac4..fe085f27 100644 --- a/pfb/workers/sara.py +++ b/pfb/workers/sara.py @@ -109,7 +109,7 @@ def sara(**kw): except Exception as e: print(e) - print(f"All done after {time.time() - ti}s", file=log) + print(f"All done after {time.time() - ti}s", file=log) def _sara(**kw):