Skip to content

Commit

Permalink
fix hanging behaviour when ntasks < nworkers in init and hci
Browse files Browse the repository at this point in the history
  • Loading branch information
landmanbester committed Aug 27, 2024
1 parent 5eff059 commit 9e911d0
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 11 deletions.
5 changes: 3 additions & 2 deletions pfb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ def set_client(nworkers, stack, log,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
cluster = stack.enter_context(cluster)
client = stack.enter_context(Client(cluster,
direct_to_workers=direct_to_workers))
client = Client(cluster,
direct_to_workers=direct_to_workers)
client = stack.enter_context(client)

client.wait_for_workers(nworkers)
dashboard_url = client.dashboard_link
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/degrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def degrid(**kw):
ti = time.time()
_degrid(**opts)

print(f"All done after {time.time() - ti}s.", file=log)
print(f"All done after {time.time() - ti}s.", file=log)

def _degrid(**kw):
opts = OmegaConf.create(kw)
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/fluxmop.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def fluxmop(**kw):
column = fut.result()
print(f'Done writing {column}', file=log)

print(f"All done after {time.time() - ti}s", file=log)
print(f"All done after {time.time() - ti}s", file=log)

def _fluxmop(**kw):
opts = OmegaConf.create(kw)
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def grid(**kw):
continue
print(f'Done writing {column}', file=log)

print(f"All done after {time.time() - ti}s", file=log)
print(f"All done after {time.time() - ti}s", file=log)

def _grid(xdsi=None, **kw):
opts = OmegaConf.create(kw)
Expand Down
7 changes: 5 additions & 2 deletions pfb/workers/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def hci(**kw):
ti = time.time()
_hci(**opts)

print(f"All done after {time.time() - ti}s", file=log)
print(f"All done after {time.time() - ti}s", file=log)

def _hci(**kw):
opts = OmegaConf.create(kw)
Expand Down Expand Up @@ -336,6 +336,9 @@ def _hci(**kw):

while idle_workers: # Seed each worker with a task.

if n_launched == nds: # Stop once all jobs have been launched.
break

(subds, jones, freqsi, utimesi, ridx, rcnts,
radeci, fi, ti, ms) = datasets[n_launched]

Expand Down Expand Up @@ -373,7 +376,7 @@ def _hci(**kw):
for completed_future in ac_iter:

if n_launched == nds: # Stop once all jobs have been launched.
continue
break

(subds, jones, freqsi, utimesi, ridx, rcnts,
radeci, fi, ti, ms) = datasets[n_launched]
Expand Down
7 changes: 5 additions & 2 deletions pfb/workers/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def init(**kw):
ti = time.time()
_init(**opts)

print(f"All done after {time.time() - ti}s", file=log)
print(f"All done after {time.time() - ti}s", file=log)

def _init(**kw):
opts = OmegaConf.create(kw)
Expand Down Expand Up @@ -282,6 +282,9 @@ def _init(**kw):
nds = len(datasets)
while idle_workers: # Seed each worker with a task.

if n_launched == nds: # Stop once all jobs have been launched.
break

(subds, jones, freqsi, chan_widthi, utimesi, ridx, rcnts,
radeci, fi, ti, ims, ms) = datasets[n_launched]

Expand Down Expand Up @@ -321,7 +324,7 @@ def _init(**kw):
for completed_future in ac_iter:

if n_launched == nds: # Stop once all jobs have been launched.
continue
break

if isinstance(completed_future.result(), BaseException):
print(completed_future.result())
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/klean.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def klean(**kw):
f'{fits_oname}_{opts.suffix}',
norm_wsum=False)

print(f"All done after {time.time() - ti}s", file=log)
print(f"All done after {time.time() - ti}s", file=log)


def _klean(**kw):
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/sara.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def sara(**kw):
except Exception as e:
print(e)

print(f"All done after {time.time() - ti}s", file=log)
print(f"All done after {time.time() - ti}s", file=log)


def _sara(**kw):
Expand Down

0 comments on commit 9e911d0

Please sign in to comment.