From f090af33f60baef3521a19dd8b4d31d5fe949a1e Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 9 Feb 2022 11:51:13 +0200 Subject: [PATCH 1/7] Lazy Proxy with Processes to read/write MS --- essays/lazy_proxy_process_essay.py | 60 ++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 essays/lazy_proxy_process_essay.py diff --git a/essays/lazy_proxy_process_essay.py b/essays/lazy_proxy_process_essay.py new file mode 100644 index 00000000..da89c1a4 --- /dev/null +++ b/essays/lazy_proxy_process_essay.py @@ -0,0 +1,60 @@ +import argparse + +import pyrap.tables as pt +import dask.array as da +from daskms.patterns import LazyProxy +import numpy as np + +def create_parser(): + p = argparse.ArgumentParser() + p.add_argument("ms") + return p + + +def _read(proxy, column, start_len): + proxy.lock(False) + start, nrow = start_len[0][0] + + try: + return proxy.getcol(column, startrow=start, nrow=nrow) + finally: + proxy.unlock() + +def _write(proxy, column, start_len, data): + proxy.lock(True) + start, nrow = start_len[0][0] + + try: + proxy.putcol(column, data, startrow=start, nrow=nrow) + return np.array([True]) + except: + return np.array([False]) + finally: + proxy.unlock() + + +def main(ms, column="TIME"): + read_proxy = LazyProxy(pt.table, ms, lockoptions="usernoread", readonly=True) + write_proxy = LazyProxy(pt.table, ms, lockoptions="user", readonly=False) + + sl = np.array(([0, 10], [10, 10], [20, 10])) + sl = da.from_array(sl, chunks=(1, 2)) + + data = da.blockwise(_read, "r", + read_proxy, None, + column, None, + sl, "rb", + meta=np.empty((0,), np.float64)) + + writes = da.blockwise(_write, "r", + write_proxy, None, + column, None, + sl, "rb", + data, "r", + meta=np.empty((0,), bool)) + + print(writes.compute(scheduler="processes")) + +if __name__ == "__main__": + args = create_parser().parse_args() + main(args.ms) From 3b5c52771d630dd41e230866c82611319eca7b85 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 9 Feb 2022 14:31:08 +0200 Subject: [PATCH 2/7] User explicit user locking --- essays/lazy_proxy_process_essay.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/essays/lazy_proxy_process_essay.py b/essays/lazy_proxy_process_essay.py index da89c1a4..d174aed0 100644 --- a/essays/lazy_proxy_process_essay.py +++ b/essays/lazy_proxy_process_essay.py @@ -34,7 +34,7 @@ def _write(proxy, column, start_len, data): def main(ms, column="TIME"): - read_proxy = LazyProxy(pt.table, ms, lockoptions="usernoread", readonly=True) + read_proxy = LazyProxy(pt.table, ms, lockoptions="user", readonly=True) write_proxy = LazyProxy(pt.table, ms, lockoptions="user", readonly=False) sl = np.array(([0, 10], [10, 10], [20, 10])) From f7b6ae954f24b7fadf5f8a57c495f4c43880acfd Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 10 Feb 2022 14:50:02 +0200 Subject: [PATCH 3/7] ProcessPoolExecutor experimentation --- daskms/experimental/arrow/reads.py | 7 +- daskms/patterns.py | 89 +++++++++-- daskms/table_executor.py | 24 ++- daskms/table_proxy.py | 30 +--- daskms/tests/test_patterns.py | 229 ++++++++++++++++++++++++++++- daskms/utils.py | 4 +- 6 files changed, 335 insertions(+), 48 deletions(-) diff --git a/daskms/experimental/arrow/reads.py b/daskms/experimental/arrow/reads.py index 7c81be09..090d38c0 100644 --- a/daskms/experimental/arrow/reads.py +++ b/daskms/experimental/arrow/reads.py @@ -2,6 +2,7 @@ import json import logging from pathlib import Path +import re from threading import Lock import weakref @@ -55,6 +56,10 @@ log = logging.getLogger(__name__) +def natural_order(key): + return tuple(int(c) if c.isdigit() else c.lower() + for c in re.split("(\d+)", str(key))) + class ParquetFileProxy(metaclass=Multiton): def __init__(self, store, key): self.store = store @@ -93,7 +98,7 @@ def __eq__(self, other): def __lt__(self, other): return (isinstance(other, ParquetFileProxy) and self.store == other.store and - self.key < other.key) + natural_order(self.key) < natural_order(other.key)) def read_column(self, column, start=None, end=None): chunks = self.file.read(columns=[column]).column(column).chunks diff --git a/daskms/patterns.py b/daskms/patterns.py index 459cd817..c52d7f9c 100644 --- a/daskms/patterns.py +++ b/daskms/patterns.py @@ -4,6 +4,7 @@ import inspect from inspect import getattr_static from threading import Lock +from types import MethodType from warnings import warn import weakref @@ -28,6 +29,22 @@ def freeze(arg): return arg +def _warn_if_positional_in_kwargs(cls, kwargs): + signature = inspect.signature(cls.__init__) + positional_in_kwargs = [p.name for p in signature.parameters.values() + if p.kind == p.POSITIONAL_OR_KEYWORD + and p.default == p.empty + and p.name in kwargs] + + if positional_in_kwargs: + warn(f"Positional arguments {positional_in_kwargs} were " + f"supplied as keyword arguments to " + f"{cls.__init__}{signature}. " + f"This may create separate Multiton instances " + f"for what is intended to be a unique set of " + f"arguments.") + + class Multiton(type): """General Multiton metaclass @@ -66,21 +83,8 @@ def __init__(self, *args, **kwargs): self.__lock = Lock() def __call__(cls, *args, **kwargs): - signature = inspect.signature(cls.__init__) - positional_in_kwargs = [p.name for p in signature.parameters.values() - if p.kind == p.POSITIONAL_OR_KEYWORD - and p.default == p.empty - and p.name in kwargs] - - if positional_in_kwargs: - warn(f"Positional arguments {positional_in_kwargs} were " - f"supplied as keyword arguments to " - f"{cls.__init__}{signature}. " - f"This may create separate Multiton instances " - f"for what is intended to be a unique set of " - f"arguments.") - - key = freeze(args + (kwargs if kwargs else Multiton.MISSING,)) + _warn_if_positional_in_kwargs(cls, kwargs) + key = freeze(args + (kwargs if kwargs else cls.MISSING,)) # Double-checked locking # https://en.wikipedia.org/wiki/Double-checked_locking @@ -98,6 +102,56 @@ def __call__(cls, *args, **kwargs): return instance +def dummy(self, *args, **kwargs): + pass + +class PersistentMultiton(type): + """Similar to a :class:`Multiton`, but """ + MISSING = object() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__cache = {} + self.__lock = Lock() + + def __new__(cls, name, bases, namespace): + return super().__new__(cls, name, bases, { + "__forget_multiton__": dummy, + **namespace, + }) + + @staticmethod + def forgetter(key): + def __forget_multiton__(self, *args, **kwargs): + try: + del self.__cache[key] + except KeyError: + pass + + return __forget_multiton__ + + def __call__(cls, *args, **kwargs): + _warn_if_positional_in_kwargs(cls, kwargs) + key = freeze(args + (kwargs if kwargs else cls.MISSING,)) + + # Double-checked locking + # https://en.wikipedia.org/wiki/Double-checked_locking + try: + return cls.__cache[key] + except KeyError: + pass + + with cls.__lock: + try: + return cls.__cache[key] + except KeyError: + instance = type.__call__(cls, *args, **kwargs) + forget_method = MethodType(cls.forgetter(key), instance) + instance.__forget_multiton__ = forget_method + cls.__cache[key] = instance + return instance + + INVALID_LAZY_CONTEXTS = set() try: @@ -359,7 +413,7 @@ def __getattr__(self, name): raise AttributeError(name) from e def __setattr__(self, name, value): - obj = self if name in self.__lazy_members__ else self.__lazy_object__ + obj = self if name in dir(self) else self.__lazy_object__ return object.__setattr__(obj, name, value) def __delattr__(self, name): @@ -391,3 +445,6 @@ def __init__(self, value): See :class:`LazyProxy` and :class:`Multiton` for further details """ + +class PersistentLazyProxyMultiton(LazyProxy, metaclass=PersistentMultiton): + pass \ No newline at end of file diff --git a/daskms/table_executor.py b/daskms/table_executor.py index 653fa0cb..4ce8ed1b 100644 --- a/daskms/table_executor.py +++ b/daskms/table_executor.py @@ -20,25 +20,35 @@ class ExecutorMetaClass(type): """ https://en.wikipedia.org/wiki/Multiton_pattern """ - def __call__(cls, key=STANDARD_EXECUTOR): + def __call__(cls, key, typ): + cache_key = (key, typ) + try: - return _executor_cache[key] + return _executor_cache[cache_key] except KeyError: pass with _executor_lock: try: - return _executor_cache[key] + return _executor_cache[cache_key] except KeyError: - instance = type.__call__(cls, key) - _executor_cache[key] = instance + instance = type.__call__(cls, key, typ) + _executor_cache[cache_key] = instance return instance class Executor(object, metaclass=ExecutorMetaClass): - def __init__(self, key=STANDARD_EXECUTOR): + def __init__(self, key=STANDARD_EXECUTOR, typ="thread"): # Initialise a single thread - self.impl = impl = cf.ThreadPoolExecutor(1) + + if typ == "thread": + impl = cf.ThreadPoolExecutor(1) + elif typ == "process": + impl = cf.ProcessPoolExecutor(1) + else: + raise ValueError(f"Invalid Executor type {typ}") + + self.impl = impl self.key = key # Register a finaliser shutting down the diff --git a/daskms/table_proxy.py b/daskms/table_proxy.py index fa3bcdba..80e11ea7 100644 --- a/daskms/table_proxy.py +++ b/daskms/table_proxy.py @@ -9,12 +9,10 @@ import pyrap.tables as pt from daskms.table_executor import Executor, STANDARD_EXECUTOR from daskms.utils import arg_hasher +from daskms.patterns import LazyProxyMultiton, Multiton log = logging.getLogger(__name__) -_table_cache = weakref.WeakValueDictionary() -_table_lock = Lock() - # CASA Table Locking Modes NOLOCK = 0 READLOCK = 1 @@ -139,28 +137,13 @@ def public_method(self, *args, **kwargs): return public_method -class TableProxyMetaClass(type): - """ - https://en.wikipedia.org/wiki/Multiton_pattern - - """ +class TableProxyMetaClass(Multiton): def __new__(cls, name, bases, dct): for method, locktype in _proxied_methods: proxy_method = proxied_method_factory(method, locktype) dct[method] = proxy_method - return type.__new__(cls, name, bases, dct) - - def __call__(cls, *args, **kwargs): - key = arg_hasher((cls,) + args + (kwargs,)) - - with _table_lock: - try: - return _table_cache[key] - except KeyError: - instance = type.__call__(cls, *args, **kwargs) - _table_cache[key] = instance - return instance + return super().__new__(cls, name, bases, dct) def _map_create_proxy(cls, factory, args, kwargs): @@ -304,8 +287,9 @@ def __init__(self, factory, *args, **kwargs): **kwargs : dict Keyword arguments passed to factory. __executor_key__ : str, optional - Executor key. Identifies a unique threadpool + Executor key. Identifies a unique pool in which table operations will be performed. + __executor_type__: {"thread", "process"} """ # Save the arguments as keys for pickling and tokenising @@ -325,11 +309,13 @@ def __init__(self, factory, *args, **kwargs): # TableProxy(*args, ex_key=..., **kwargs) kwargs = kwargs.copy() self._ex_key = kwargs.pop("__executor_key__", STANDARD_EXECUTOR) + ex_type = kwargs.pop("__executor_type__", "thread") # Store a reference to the Executor wrapper class # so that the Executor is retained while this TableProxy # still lives - self._ex_wrapper = ex = Executor(key=self._ex_key) + self._ex_wrapper = ex = Executor(key=self._ex_key, typ=ex_type) + self._table_future = table = ex.impl.submit(factory, *args, **kwargs) weakref.finalize(self, _table_future_finaliser, ex, table, diff --git a/daskms/tests/test_patterns.py b/daskms/tests/test_patterns.py index 313b54a0..79c151b5 100644 --- a/daskms/tests/test_patterns.py +++ b/daskms/tests/test_patterns.py @@ -6,7 +6,11 @@ import pytest from daskms.patterns import ( - Multiton, LazyProxy, LazyProxyMultiton) + Multiton, + PersistentLazyProxyMultiton, + PersistentMultiton, + LazyProxy, + LazyProxyMultiton) class DummyResource: @@ -106,6 +110,229 @@ def reader(file_proxy, pool_proxy, other): values.compute(scheduler="processes") +class PersistentA(metaclass=PersistentMultiton): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + @classmethod + def from_args(cls, args, kwargs): + return cls(*args, **kwargs) + + def __reduce__(self): + return (self.from_args, (self.args, self.kwargs)) + + +def test_persistent_multiton(): + a = PersistentA(1) + assert a is PersistentA(1) + assert pickle.loads(pickle.dumps(a)) is PersistentA(1) + + assert len(PersistentA._PersistentMultiton__cache) == 1 + assert next(iter(PersistentA._PersistentMultiton__cache.values())) is a + + a.__forget_multiton__() + assert len(PersistentA._PersistentMultiton__cache) == 0 + + + +# CASA Table Locking Modes +NOLOCK = 0 +READLOCK = 1 +WRITELOCK = 2 + +# List of CASA Table methods to proxy and the appropriate locking mode +_proxied_methods = [ + # Queries + ("nrows", READLOCK), + ("colnames", READLOCK), + ("getcoldesc", READLOCK), + ("getdminfo", READLOCK), + ("iswritable", READLOCK), + # Modification + ("addrows", WRITELOCK), + ("addcols", WRITELOCK), + ("setmaxcachesize", WRITELOCK), + # Reads + ("getcol", READLOCK), + ("getcolslice", READLOCK), + ("getcolnp", READLOCK), + ("getvarcol", READLOCK), + ("getcell", READLOCK), + ("getcellslice", READLOCK), + ("getkeywords", READLOCK), + ("getcolkeywords", READLOCK), + # Writes + ("putcol", WRITELOCK), + ("putcolslice", WRITELOCK), + ("putcolnp", WRITELOCK), + ("putvarcol", WRITELOCK), + ("putcellslice", WRITELOCK), + ("putkeyword", WRITELOCK), + ("putkeywords", WRITELOCK), + ("putcolkeywords", WRITELOCK)] + + + +def proxied_method_factory(cls, method, locktype): + """ + Proxy pyrap.tables.table.method calls. + + Creates a private implementation function which performs + the call locked according to to ``locktype``. + + The private implementation is accessed by a public ``method`` + which submits a call to the implementation + on a concurrent.futures.ThreadPoolExecutor. + """ + if locktype == NOLOCK: + runner = __nolock_runner + elif locktype == READLOCK: + runner = __read_runner + elif locktype == WRITELOCK: + runner = __write_runner + else: + raise ValueError(f"Invalid locktype {locktype}") + + def public_method(self, *args, **kwargs): + """ + Submits _impl(args, kwargs) to the executor + and returns a Future + """ + return self._ex.submit(runner, self.proxy, method, args, kwargs) + + public_method.__name__ = method + public_method.__doc__ = f"Call table.{method}" + + return public_method + + + +def __nolock_runner(proxy, method, args, kwargs): + try: + return getattr(proxy, method)(*args, **kwargs) + except Exception as e: + print(str(e)) + +def __read_runner(proxy, method, args, kwargs): + proxy.lock(False) + + try: + return getattr(proxy, method)(*args, **kwargs) + except Exception as e: + print(str(e)) + finally: + proxy.unlock() + +def __write_runner(proxy, method, args, kwargs): + proxy.lock(True) + + try: + return getattr(proxy, method)(*args, **kwargs) + except Exception as e: + print(str(e)) + finally: + proxy.unlock() + +class TableProxyMetaClass(Multiton): + def __new__(cls, name, bases, dct): + for method, locktype in _proxied_methods: + dct[method] = proxied_method_factory(cls, method, locktype) + + return super().__new__(cls, name, bases, dct) + +class TableProxy(metaclass=TableProxyMetaClass): + def __init__(self, factory, *args, **kwargs): + import weakref + import concurrent.futures as cf + import multiprocessing + + self.factory = factory + self.args = args + self.kwargs = kwargs + self.proxy = proxy = PersistentLazyProxyMultiton( + self.factory, + *self.args, + **self.kwargs) + + spawn_ctx = multiprocessing.get_context("spawn") + self._ex = executor = cf.ProcessPoolExecutor(5, mp_context=spawn_ctx) + + weakref.finalize(self, self.finalise_proxy, proxy, executor) + + + @staticmethod + def finalise_proxy(proxy, executor): + nprocess = len(executor._processes) + list(executor.map(proxy.__forget_multiton__, [None]*nprocess)) + print(f"Finalising {proxy}") + proxy.__forget_multiton__() + executor.shutdown(wait=True) + + @classmethod + def from_args(cls, factory, args, kwargs): + return cls(factory, *args, **kwargs) + + def __reduce__(self): + return (self.from_args, (self.factory, self.args, self.kwargs)) + + +import os + +class Resource: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + print(f"Creating Resource in {os.getpid()} {args} {kwargs}") + + def execute(self, *args, **kwargs): + print(f"execute in {os.getpid()} {args} {kwargs}") + + def close(self): + print(f"Closing Resource in {os.getpid()} {self.args} {self.kwargs}") + + +def process_fn(proxy, *args, **kwargs): + return proxy.execute(*args, **kwargs) + +def test_persistent_multiton_in_process_pool(): + import concurrent.futures as cf + import multiprocessing + spawn_ctx = multiprocessing.get_context("spawn") + + pool = cf.ProcessPoolExecutor(4, mp_context=spawn_ctx) + proxy = PersistentLazyProxyMultiton((Resource, Resource.close)) + + pool.submit(process_fn, proxy, 1, 2).result() + for r in pool.map(proxy.__forget_multiton__, [None]*len(pool._processes)): + print(r) + + print("Shutting down") + pool.shutdown(wait=True) + + + +def test_ms_in_persistent_multiton(ms): + import pyrap.tables as pt + proxy = TableProxy(pt.table, ms, ack=True) + print(proxy.nrows().result()) + + def ranges(length, chunk): + n = 0 + + while n < length: + yield n, chunk + n += chunk + + + + futures = [proxy.getcol("TIME", startrow=s, nrow=n) + for s, n in ranges(10, 1)] + + import concurrent.futures as cf + for f in cf.as_completed(futures): + print(f.result()) + def test_multiton(): class A(metaclass=Multiton): def __init__(self, *args, **kwargs): diff --git a/daskms/utils.py b/daskms/utils.py index 2862c236..986b437d 100644 --- a/daskms/utils.py +++ b/daskms/utils.py @@ -118,13 +118,15 @@ def assert_liveness(table_proxies, executors, collect=True): Asserts that the given number of TableProxy and Executor objects are alive. """ - from daskms.table_proxy import _table_cache + from daskms.table_proxy import TableProxy from daskms.table_executor import _executor_cache import gc if collect: gc.collect() + _table_cache = TableProxy._Multiton__cache + if table_proxies is not None and len(_table_cache) != table_proxies: lines = ["len(_table_cache)[%d] != %d" % (len(_table_cache), table_proxies)] From e0575ba595f37626fe9c4f53acf3db425cafa447 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 11 Feb 2022 15:52:55 +0200 Subject: [PATCH 4/7] Handle non __slot__ LazyObject attributes --- daskms/patterns.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/daskms/patterns.py b/daskms/patterns.py index c52d7f9c..8a9d482c 100644 --- a/daskms/patterns.py +++ b/daskms/patterns.py @@ -413,8 +413,20 @@ def __getattr__(self, name): raise AttributeError(name) from e def __setattr__(self, name, value): - obj = self if name in dir(self) else self.__lazy_object__ - return object.__setattr__(obj, name, value) + # Defer to self + # if name in self.__lazy_members__: + # return object.__setattr__(self, name, value) + + try: + # name might exist on self, e.g. maybe a metaclass + # added a method + getattr_static(self, name) + except AttributeError: + # Nope, defer to the proxy + return object.__setattr__(self.__lazy_object__, name, value) + else: + # Defer to self + return object.__setattr__(self, name, value) def __delattr__(self, name): if name in self.__lazy_members__: From 6f5d0b71c7484ab04f13528ab0df1e985b240abf Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 11 Feb 2022 15:53:07 +0200 Subject: [PATCH 5/7] Handle non __slot__ LazyObject attributes --- daskms/patterns.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daskms/patterns.py b/daskms/patterns.py index 8a9d482c..554e6596 100644 --- a/daskms/patterns.py +++ b/daskms/patterns.py @@ -414,8 +414,8 @@ def __getattr__(self, name): def __setattr__(self, name, value): # Defer to self - # if name in self.__lazy_members__: - # return object.__setattr__(self, name, value) + if name in self.__lazy_members__: + return object.__setattr__(self, name, value) try: # name might exist on self, e.g. maybe a metaclass From a615e3b7a7fe16938818fe580bc37e5742b88958 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 11 Feb 2022 15:53:31 +0200 Subject: [PATCH 6/7] Boondoggle --- daskms/tests/test_patterns.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/daskms/tests/test_patterns.py b/daskms/tests/test_patterns.py index 79c151b5..4fdf019a 100644 --- a/daskms/tests/test_patterns.py +++ b/daskms/tests/test_patterns.py @@ -255,14 +255,18 @@ def __init__(self, factory, *args, **kwargs): *self.args, **self.kwargs) + + spawn_ctx = multiprocessing.get_context("spawn") self._ex = executor = cf.ProcessPoolExecutor(5, mp_context=spawn_ctx) + self._ex = executor = LazyProxyMultiton(cf.ProcessPoolExecutor, 5, mp_context=spawn_ctx) + #self._ex = executor = cf.ThreadPoolExecutor(1) - weakref.finalize(self, self.finalise_proxy, proxy, executor) + weakref.finalize(self, self.finaliser, proxy, executor) @staticmethod - def finalise_proxy(proxy, executor): + def finaliser(proxy, executor): nprocess = len(executor._processes) list(executor.map(proxy.__forget_multiton__, [None]*nprocess)) print(f"Finalising {proxy}") From 5d86f8f05bcb39253fa0af6f0633f5b73bd79f68 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 11 Feb 2022 15:53:59 +0200 Subject: [PATCH 7/7] Make __reduce__ factory a classmethod --- daskms/table_proxy.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/daskms/table_proxy.py b/daskms/table_proxy.py index 80e11ea7..b7fcff15 100644 --- a/daskms/table_proxy.py +++ b/daskms/table_proxy.py @@ -146,11 +146,6 @@ def __new__(cls, name, bases, dct): return super().__new__(cls, name, bases, dct) -def _map_create_proxy(cls, factory, args, kwargs): - """ Support pickling of kwargs in TableProxy.__reduce__ """ - return cls(factory, *args, **kwargs) - - class MismatchedLocks(Exception): pass @@ -344,10 +339,13 @@ def __init__(self, factory, *args, **kwargs): def executor_key(self): return self._ex_key + @classmethod + def from_args(cls, factory, args, kwargs): + return cls(factory, *args, **kwargs) + def __reduce__(self): """ Defer to _map_create_proxy to support kwarg pickling """ - return (_map_create_proxy, (TableProxy, self._factory, - self._args, self._kwargs)) + return (self.from_args, (self._factory, self._args, self._kwargs)) def __enter__(self): return self