From 58305de52f316cd28ba7662a4e0583c7b5086e72 Mon Sep 17 00:00:00 2001 From: Millefeuille Date: Tue, 5 Mar 2024 14:25:16 +0100 Subject: [PATCH 1/4] Add support of redis clusters (#1) Add Redis cluster support as session management backend --- beaker/cache.py | 4 +- beaker/docs/modules/rediscluster.rst | 10 ++ beaker/ext/redisclusternm.py | 165 +++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 1 deletion(-) create mode 100644 beaker/docs/modules/rediscluster.rst create mode 100644 beaker/ext/redisclusternm.py diff --git a/beaker/cache.py b/beaker/cache.py index 5a1ad6a4..61b8e4e9 100644 --- a/beaker/cache.py +++ b/beaker/cache.py @@ -22,6 +22,7 @@ import beaker.ext.google as google import beaker.ext.mongodb as mongodb import beaker.ext.redisnm as redisnm +import beaker.ext.redisclusternm as redisclusternm from functools import wraps # Initialize the cache region dict @@ -126,7 +127,8 @@ def _init(self): 'ext:sqla': sqla.SqlaNamespaceManager, 'ext:google': google.GoogleNamespaceManager, 'ext:mongodb': mongodb.MongoNamespaceManager, - 'ext:redis': redisnm.RedisNamespaceManager + 'ext:redis': redisnm.RedisNamespaceManager, + 'ext:rediscluster': redisclusternm.RedisClusterNamespaceManager, }) diff --git a/beaker/docs/modules/rediscluster.rst b/beaker/docs/modules/rediscluster.rst new file mode 100644 index 00000000..5546656f --- /dev/null +++ b/beaker/docs/modules/rediscluster.rst @@ -0,0 +1,10 @@ +:mod:`beaker.ext.redisclusternm` -- Redis cluster NameSpace Manager and Synchronizer +============================================================================== + +.. automodule:: beaker.ext.redisclusternm + +Module Contents +--------------- + +.. autoclass:: RedisClusterNamespaceManager +.. autoclass:: RedisClusterSynchronizer diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py new file mode 100644 index 00000000..22ce1d75 --- /dev/null +++ b/beaker/ext/redisclusternm.py @@ -0,0 +1,165 @@ +import os +import threading +import time +import pickle + +try: + import redis +except ImportError: + redis = None + +from beaker.container import NamespaceManager +from beaker.synchronization import SynchronizerImpl +from beaker.util import SyncDict, machine_identifier +from beaker.crypto.util import sha1 +from beaker._compat import string_type, PY2 + + +class RedisClusterNamespaceManager(NamespaceManager): + """Provides the :class:`.NamespaceManager` API over Redis cluster. + + Provided ``urls`` can be both multiple redis connection strings separated by a comma or + an already existing RedisCluster instance. + + Unlike a StrictRedis connection string, a RedisCluster one does not support + database indicators, it is zero by default. + + Example: `redis://node-1:7001,redis://node-2:7002` + + The data will be stored into redis keys, with their name + starting with ``beaker_cache:``. + """ + MAX_KEY_LENGTH = 1024 + + clients = SyncDict() + + def __init__(self, namespace, urls, timeout=None, **kw): + super(RedisClusterNamespaceManager, self).__init__(namespace) + self.lock_dir = None # Redis uses redis itself for locking. + self.timeout = timeout + self.nodes = [] + + if redis is None: + raise RuntimeError('redis is not available') + + if isinstance(urls, string_type): + for url in urls.split(','): + url_options = redis.connection.parse_url(url) + if 'db' in url_options: + raise redis.cluster.RedisClusterException( + "A ``db`` querystring option can only be 0 in cluster mode" + ) + self.nodes.append(redis.cluster.ClusterNode( + host=url_options.get('host'), + port=url_options.get('port') + )) + self.client = RedisClusterNamespaceManager.clients.get( + urls, redis.cluster.RedisCluster, startup_nodes=self.nodes + ) + else: + self.client = urls + + def _format_key(self, key): + if not isinstance(key, str): + key = key.decode('ascii') + if len(key) > (self.MAX_KEY_LENGTH - len(self.namespace) - len('beaker_cache:') - 1): + if not PY2: + key = key.encode('utf-8') + key = sha1(key).hexdigest() + return 'beaker_cache:%s:%s' % (self.namespace, key) + + def get_creation_lock(self, key): + return RedisClusterSynchronizer(self._format_key(key), self.client) + + def __getitem__(self, key): + entry = self.client.get(self._format_key(key)) + if entry is None: + raise KeyError(key) + return pickle.loads(entry) + + def __contains__(self, key): + return self.client.exists(self._format_key(key)) + + def has_key(self, key): + return key in self + + def set_value(self, key, value, expiretime=None): + value = pickle.dumps(value) + if expiretime is None and self.timeout is not None: + expiretime = self.timeout + if expiretime is not None: + self.client.setex(self._format_key(key), int(expiretime), value) + else: + self.client.set(self._format_key(key), value) + + def __setitem__(self, key, value): + self.set_value(key, value) + + def __delitem__(self, key): + self.client.delete(self._format_key(key)) + + def do_remove(self): + for k in self.keys(): + self.client.delete(k) + + def keys(self): + return self.client.keys('beaker_cache:%s:*' % self.namespace) + + +class RedisClusterSynchronizer(SynchronizerImpl): + """Synchronizer based on redis cluster. + + Provided ``urls`` can be both multiple redis connection strings separated by a comma or + an already existing RedisCluster instance. + + Unlike a StrictRedis connection string, a RedisCluster one does not support + database indicators, it is zero by default. + + Example: ``redis://node-1:7001,redis://node-2:7002, + + This Synchronizer only supports 1 reader or 1 writer at time, not concurrent readers. + """ + # If a cache entry generation function can take a lot, + # but 15 minutes is more than a reasonable time. + LOCK_EXPIRATION = 900 + MACHINE_ID = machine_identifier() + + def __init__(self, identifier, urls): + super(RedisClusterSynchronizer, self).__init__() + self.identifier = 'beaker_lock:%s' % identifier + if isinstance(urls, string_type): + self.client = RedisClusterNamespaceManager.clients.get(urls, redis.cluster.RedisCluster.from_url, urls) + else: + self.client = urls + + def _get_owner_id(self): + return ( + '%s-%s-%s' % (self.MACHINE_ID, os.getpid(), threading.current_thread().ident) + ).encode('ascii') + + def do_release_read_lock(self): + self.do_release_write_lock() + + def do_acquire_read_lock(self, wait): + self.do_acquire_write_lock(wait) + + def do_release_write_lock(self): + identifier = self.identifier + owner_id = self._get_owner_id() + + def execute_release(pipe): + lock_value = pipe.get(identifier) + if lock_value == owner_id: + pipe.delete(identifier) + + self.client.transaction(execute_release, identifier) + + def do_acquire_write_lock(self, wait): + owner_id = self._get_owner_id() + while True: + if self.client.set(self.identifier, owner_id, ex=self.LOCK_EXPIRATION, nx=True): + return True + + if not wait: + return False + time.sleep(0.2) From 896aec0a8c8d60e5fdf6170f315ace6ed4b80209 Mon Sep 17 00:00:00 2001 From: Millefeuille42 Date: Mon, 18 Mar 2024 10:19:54 +0100 Subject: [PATCH 2/4] feat: added support of additional options --- beaker/ext/redisclusternm.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py index 22ce1d75..f9b3dde3 100644 --- a/beaker/ext/redisclusternm.py +++ b/beaker/ext/redisclusternm.py @@ -43,18 +43,21 @@ def __init__(self, namespace, urls, timeout=None, **kw): raise RuntimeError('redis is not available') if isinstance(urls, string_type): + options = None for url in urls.split(','): url_options = redis.connection.parse_url(url) if 'db' in url_options: - raise redis.cluster.RedisClusterException( + raise redis.exceptions.RedisClusterException( "A ``db`` querystring option can only be 0 in cluster mode" ) self.nodes.append(redis.cluster.ClusterNode( - host=url_options.get('host'), - port=url_options.get('port') + host=url_options.pop('host'), + port=url_options.pop('port') )) + if options is None: + options = url_options self.client = RedisClusterNamespaceManager.clients.get( - urls, redis.cluster.RedisCluster, startup_nodes=self.nodes + urls, redis.cluster.RedisCluster, startup_nodes=self.nodes, **options ) else: self.client = urls From 5f2510381b79949f5b1391b6a0fbcb8faa841ee3 Mon Sep 17 00:00:00 2001 From: Millefeuille42 Date: Mon, 18 Mar 2024 17:57:41 +0100 Subject: [PATCH 3/4] fix: options are passed through kwargs --- beaker/ext/redisclusternm.py | 121 ++++++----------------------------- 1 file changed, 20 insertions(+), 101 deletions(-) diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py index f9b3dde3..0024ae89 100644 --- a/beaker/ext/redisclusternm.py +++ b/beaker/ext/redisclusternm.py @@ -3,19 +3,18 @@ import time import pickle +from beaker.container import NamespaceManager + try: import redis except ImportError: redis = None -from beaker.container import NamespaceManager -from beaker.synchronization import SynchronizerImpl -from beaker.util import SyncDict, machine_identifier -from beaker.crypto.util import sha1 -from beaker._compat import string_type, PY2 +from beaker.ext.redisnm import RedisNamespaceManager, RedisSynchronizer +from beaker._compat import string_type -class RedisClusterNamespaceManager(NamespaceManager): +class RedisClusterNamespaceManager(RedisNamespaceManager): """Provides the :class:`.NamespaceManager` API over Redis cluster. Provided ``urls`` can be both multiple redis connection strings separated by a comma or @@ -26,24 +25,23 @@ class RedisClusterNamespaceManager(NamespaceManager): Example: `redis://node-1:7001,redis://node-2:7002` + Additional options can be passed in kwargs (e.g. `username="redis", password="secure_password"`). + The data will be stored into redis keys, with their name starting with ``beaker_cache:``. """ - MAX_KEY_LENGTH = 1024 - - clients = SyncDict() - def __init__(self, namespace, urls, timeout=None, **kw): - super(RedisClusterNamespaceManager, self).__init__(namespace) + def __init__(self, namespace, urls, timeout=None, **kwargs): + super(RedisNamespaceManager, self).__init__(namespace) self.lock_dir = None # Redis uses redis itself for locking. self.timeout = timeout self.nodes = [] + self.options = kwargs if redis is None: raise RuntimeError('redis is not available') if isinstance(urls, string_type): - options = None for url in urls.split(','): url_options = redis.connection.parse_url(url) if 'db' in url_options: @@ -51,65 +49,20 @@ def __init__(self, namespace, urls, timeout=None, **kw): "A ``db`` querystring option can only be 0 in cluster mode" ) self.nodes.append(redis.cluster.ClusterNode( - host=url_options.pop('host'), - port=url_options.pop('port') + host=url_options.get('host'), + port=url_options.get('port') )) - if options is None: - options = url_options self.client = RedisClusterNamespaceManager.clients.get( - urls, redis.cluster.RedisCluster, startup_nodes=self.nodes, **options + urls, redis.cluster.RedisCluster, startup_nodes=self.nodes, **kwargs ) else: self.client = urls - def _format_key(self, key): - if not isinstance(key, str): - key = key.decode('ascii') - if len(key) > (self.MAX_KEY_LENGTH - len(self.namespace) - len('beaker_cache:') - 1): - if not PY2: - key = key.encode('utf-8') - key = sha1(key).hexdigest() - return 'beaker_cache:%s:%s' % (self.namespace, key) - def get_creation_lock(self, key): - return RedisClusterSynchronizer(self._format_key(key), self.client) - - def __getitem__(self, key): - entry = self.client.get(self._format_key(key)) - if entry is None: - raise KeyError(key) - return pickle.loads(entry) - - def __contains__(self, key): - return self.client.exists(self._format_key(key)) - - def has_key(self, key): - return key in self - - def set_value(self, key, value, expiretime=None): - value = pickle.dumps(value) - if expiretime is None and self.timeout is not None: - expiretime = self.timeout - if expiretime is not None: - self.client.setex(self._format_key(key), int(expiretime), value) - else: - self.client.set(self._format_key(key), value) - - def __setitem__(self, key, value): - self.set_value(key, value) - - def __delitem__(self, key): - self.client.delete(self._format_key(key)) - - def do_remove(self): - for k in self.keys(): - self.client.delete(k) - - def keys(self): - return self.client.keys('beaker_cache:%s:*' % self.namespace) + return RedisClusterSynchronizer(self._format_key(key), self.client, self.nodes, **self.options) -class RedisClusterSynchronizer(SynchronizerImpl): +class RedisClusterSynchronizer(RedisSynchronizer): """Synchronizer based on redis cluster. Provided ``urls`` can be both multiple redis connection strings separated by a comma or @@ -122,47 +75,13 @@ class RedisClusterSynchronizer(SynchronizerImpl): This Synchronizer only supports 1 reader or 1 writer at time, not concurrent readers. """ - # If a cache entry generation function can take a lot, - # but 15 minutes is more than a reasonable time. - LOCK_EXPIRATION = 900 - MACHINE_ID = machine_identifier() - def __init__(self, identifier, urls): - super(RedisClusterSynchronizer, self).__init__() + def __init__(self, identifier, urls, nodes=None, **kwargs): + super(RedisSynchronizer, self).__init__() self.identifier = 'beaker_lock:%s' % identifier if isinstance(urls, string_type): - self.client = RedisClusterNamespaceManager.clients.get(urls, redis.cluster.RedisCluster.from_url, urls) + self.client = RedisClusterNamespaceManager.clients.get( + urls, redis.cluster.RedisCluster, startup_nodes=nodes, **kwargs + ) else: self.client = urls - - def _get_owner_id(self): - return ( - '%s-%s-%s' % (self.MACHINE_ID, os.getpid(), threading.current_thread().ident) - ).encode('ascii') - - def do_release_read_lock(self): - self.do_release_write_lock() - - def do_acquire_read_lock(self, wait): - self.do_acquire_write_lock(wait) - - def do_release_write_lock(self): - identifier = self.identifier - owner_id = self._get_owner_id() - - def execute_release(pipe): - lock_value = pipe.get(identifier) - if lock_value == owner_id: - pipe.delete(identifier) - - self.client.transaction(execute_release, identifier) - - def do_acquire_write_lock(self, wait): - owner_id = self._get_owner_id() - while True: - if self.client.set(self.identifier, owner_id, ex=self.LOCK_EXPIRATION, nx=True): - return True - - if not wait: - return False - time.sleep(0.2) From 7822729211385d11c6cb13383c3b3710b7bf495a Mon Sep 17 00:00:00 2001 From: Millefeuille42 Date: Wed, 20 Mar 2024 16:17:52 +0100 Subject: [PATCH 4/4] feat: added testsuite --- tests/test_managers/test_ext_rediscluster.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 tests/test_managers/test_ext_rediscluster.py diff --git a/tests/test_managers/test_ext_rediscluster.py b/tests/test_managers/test_ext_rediscluster.py new file mode 100644 index 00000000..5c19d549 --- /dev/null +++ b/tests/test_managers/test_ext_rediscluster.py @@ -0,0 +1,16 @@ +from beaker.cache import Cache +from . import base + + +class TestRedis(base.CacheManagerBaseTests): + CACHE_ARGS = { + 'type': 'ext:rediscluster', + 'url': 'redis://localhost:6379' + } + + def test_client_reuse(self): + cache1 = Cache('test1', **self.CACHE_ARGS) + cli1 = cache1.namespace.client + cache2 = Cache('test2', **self.CACHE_ARGS) + cli2 = cache2.namespace.client + self.assertTrue(cli1 is cli2) \ No newline at end of file