From 78a570cd9ed62db748868455cf82f7b5fd7ec24e Mon Sep 17 00:00:00 2001 From: Millefeuille Date: Tue, 5 Mar 2024 13:54:56 +0100 Subject: [PATCH 1/5] Add support of redis clusters --- beaker/cache.py | 4 +- beaker/docs/modules/rediscluster.rst | 10 ++ beaker/ext/redisclusternm.py | 158 +++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 beaker/docs/modules/rediscluster.rst create mode 100644 beaker/ext/redisclusternm.py diff --git a/beaker/cache.py b/beaker/cache.py index 5a1ad6a4..b58f7f52 100644 --- a/beaker/cache.py +++ b/beaker/cache.py @@ -22,6 +22,7 @@ import beaker.ext.google as google import beaker.ext.mongodb as mongodb import beaker.ext.redisnm as redisnm +import beaker.ext.redisclusternm as redisclusternm from functools import wraps # Initialize the cache region dict @@ -126,7 +127,8 @@ def _init(self): 'ext:sqla': sqla.SqlaNamespaceManager, 'ext:google': google.GoogleNamespaceManager, 'ext:mongodb': mongodb.MongoNamespaceManager, - 'ext:redis': redisnm.RedisNamespaceManager + 'ext:redis': redisnm.RedisNamespaceManager, + 'ext:rediscluster': redisclusternm }) diff --git a/beaker/docs/modules/rediscluster.rst b/beaker/docs/modules/rediscluster.rst new file mode 100644 index 00000000..5546656f --- /dev/null +++ b/beaker/docs/modules/rediscluster.rst @@ -0,0 +1,10 @@ +:mod:`beaker.ext.redisclusternm` -- Redis cluster NameSpace Manager and Synchronizer +============================================================================== + +.. automodule:: beaker.ext.redisclusternm + +Module Contents +--------------- + +.. autoclass:: RedisClusterNamespaceManager +.. autoclass:: RedisClusterSynchronizer diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py new file mode 100644 index 00000000..d7b68dfd --- /dev/null +++ b/beaker/ext/redisclusternm.py @@ -0,0 +1,158 @@ +import os +import threading +import time +import pickle + +try: + import redis +except ImportError: + redis = None + +from beaker.container import NamespaceManager +from beaker.synchronization import SynchronizerImpl +from beaker.util import SyncDict, machine_identifier +from beaker.crypto.util import sha1 +from beaker._compat import string_type, PY2 + + +class RedisClusterNamespaceManager(NamespaceManager): + """Provides the :class:`.NamespaceManager` API over Redis cluster. + + Provided ``url`` can be both a redis connection string or + an already existing RedisCluster instance. + + The data will be stored into redis keys, with their name + starting with ``beaker_cache:``. So make sure you provide + a specific database number if you don't want to mix them + with your own data. + """ + MAX_KEY_LENGTH = 1024 + + clients = SyncDict() + + def __init__(self, namespace, url, timeout=None, **kw): + super(RedisClusterNamespaceManager, self).__init__(namespace) + self.lock_dir = None # Redis uses redis itself for locking. + self.timeout = timeout + self.nodes = [] + + if redis is None: + raise RuntimeError('redis is not available') + + if isinstance(url, string_type): + for url in url.split(','): + url_options = redis.connection.parse_url(url) + if 'db' in url_options: + # Argument 'db' is not possible to use in cluster mode + raise redis.cluster.RedisClusterException( + "A ``db`` querystring option can only be 0 in cluster mode" + ) + self.nodes.append(redis.cluster.ClusterNode( + host=url_options.get('host'), + port=url_options.get('port') + )) + self.client = RedisClusterNamespaceManager.clients.get( + url, redis.RedisCluster, startup_nodes=self.nodes + ) + else: + self.client = url + + def _format_key(self, key): + if not isinstance(key, str): + key = key.decode('ascii') + if len(key) > (self.MAX_KEY_LENGTH - len(self.namespace) - len('beaker_cache:') - 1): + if not PY2: + key = key.encode('utf-8') + key = sha1(key).hexdigest() + return 'beaker_cache:%s:%s' % (self.namespace, key) + + def get_creation_lock(self, key): + return RedisClusterSynchronizer(self._format_key(key), self.client) + + def __getitem__(self, key): + entry = self.client.get(self._format_key(key)) + if entry is None: + raise KeyError(key) + return pickle.loads(entry) + + def __contains__(self, key): + return self.client.exists(self._format_key(key)) + + def has_key(self, key): + return key in self + + def set_value(self, key, value, expiretime=None): + value = pickle.dumps(value) + if expiretime is None and self.timeout is not None: + expiretime = self.timeout + if expiretime is not None: + self.client.setex(self._format_key(key), int(expiretime), value) + else: + self.client.set(self._format_key(key), value) + + def __setitem__(self, key, value): + self.set_value(key, value) + + def __delitem__(self, key): + self.client.delete(self._format_key(key)) + + def do_remove(self): + for k in self.keys(): + self.client.delete(k) + + def keys(self): + return self.client.keys('beaker_cache:%s:*' % self.namespace) + + +class RedisClusterSynchronizer(SynchronizerImpl): + """Synchronizer based on redis cluster. + + Provided ``url`` can be both a redis connection string or + an already existing RedisCluster instance. + + This Synchronizer only supports 1 reader or 1 writer at time, not concurrent readers. + """ + # If a cache entry generation function can take a lot, + # but 15 minutes is more than a reasonable time. + LOCK_EXPIRATION = 900 + MACHINE_ID = machine_identifier() + + def __init__(self, identifier, url): + super(RedisClusterSynchronizer, self).__init__() + self.identifier = 'beaker_lock:%s' % identifier + if isinstance(url, string_type): + self.client = RedisClusterNamespaceManager.clients.get(url, redis.RedisCluster.from_url, url) + else: + self.client = url + + def _get_owner_id(self): + return ( + '%s-%s-%s' % (self.MACHINE_ID, os.getpid(), threading.current_thread().ident) + ).encode('ascii') + + def do_release_read_lock(self): + self.do_release_write_lock() + + def do_acquire_read_lock(self, wait): + self.do_acquire_write_lock(wait) + + def do_release_write_lock(self): + identifier = self.identifier + owner_id = self._get_owner_id() + + def execute_release(pipe): + lock_value = pipe.get(identifier) + if lock_value == owner_id: + pipe.delete(identifier) + + self.client.transaction(execute_release, identifier) + + def do_acquire_write_lock(self, wait): + owner_id = self._get_owner_id() + while True: + if self.client.set(self.identifier, owner_id, ex=self.LOCK_EXPIRATION, nx=True): + return True + + if not wait: + return False + time.sleep(0.2) From 80d72c198f0de2830a8000997b40d7cab66a1c89 Mon Sep 17 00:00:00 2001 From: Millefeuille Date: Tue, 5 Mar 2024 14:19:22 +0100 Subject: [PATCH 2/5] Add information in code docstring --- beaker/ext/redisclusternm.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py index d7b68dfd..9a15f2bf 100644 --- a/beaker/ext/redisclusternm.py +++ b/beaker/ext/redisclusternm.py @@ -18,9 +18,14 @@ class RedisClusterNamespaceManager(NamespaceManager): """Provides the :class:`.NamespaceManager` API over Redis cluster. - Provided ``url`` can be both a redis connection string or + Provided ``urls`` can be both multiple redis connection strings separated by a comma or an already existing RedisCluster instance. + Unlike a StrictRedis connection string, a RedisCluster one does not support + database indicators, it is zero by default. + + Example: `redis://node-1:7001,redis://node-2:7002` + The data will be stored into redis keys, with their name starting with ``beaker_cache:``. So make sure you provide a specific database number if you don't want to mix them @@ -30,7 +35,7 @@ class RedisClusterNamespaceManager(NamespaceManager): clients = SyncDict() - def __init__(self, namespace, url, timeout=None, **kw): + def __init__(self, namespace, urls, timeout=None, **kw): super(RedisClusterNamespaceManager, self).__init__(namespace) self.lock_dir = None # Redis uses redis itself for locking. self.timeout = timeout @@ -39,11 +44,10 @@ def __init__(self, namespace, url, timeout=None, **kw): if redis is None: raise RuntimeError('redis is not available') - if isinstance(url, string_type): - for url in url.split(','): + if isinstance(urls, string_type): + for url in urls.split(','): url_options = redis.connection.parse_url(url) if 'db' in url_options: - # Argument 'db' is not possible to use in cluster mode raise redis.cluster.RedisClusterException( "A ``db`` querystring option can only be 0 in cluster mode" ) @@ -52,10 +56,10 @@ def __init__(self, namespace, url, timeout=None, **kw): port=url_options.get('port') )) self.client = RedisClusterNamespaceManager.clients.get( - url, redis.RedisCluster, startup_nodes=self.nodes + urls, redis.RedisCluster, startup_nodes=self.nodes ) else: - self.client = url + self.client = urls def _format_key(self, key): if not isinstance(key, str): @@ -107,9 +111,14 @@ def keys(self): class RedisClusterSynchronizer(SynchronizerImpl): """Synchronizer based on redis cluster. - Provided ``url`` can be both a redis connection string or + Provided ``urls`` can be both multiple redis connection strings separated by a comma or an already existing RedisCluster instance. + Unlike a StrictRedis connection string, a RedisCluster one does not support + database indicators, it is zero by default. + + Example: ``redis://node-1:7001,redis://node-2:7002, + This Synchronizer only supports 1 reader or 1 writer at time, not concurrent readers. """ # If a cache entry generation function can take a lot, @@ -117,13 +126,13 @@ class RedisClusterSynchronizer(SynchronizerImpl): LOCK_EXPIRATION = 900 MACHINE_ID = machine_identifier() - def __init__(self, identifier, url): + def __init__(self, identifier, urls): super(RedisClusterSynchronizer, self).__init__() self.identifier = 'beaker_lock:%s' % identifier - if isinstance(url, string_type): - self.client = RedisClusterNamespaceManager.clients.get(url, redis.RedisCluster.from_url, url) + if isinstance(urls, string_type): + self.client = RedisClusterNamespaceManager.clients.get(urls, redis.RedisCluster.from_url, urls) else: - self.client = url + self.client = urls def _get_owner_id(self): return ( From 491852dd70a662dcb0607a013b9986b6339462ff Mon Sep 17 00:00:00 2001 From: Millefeuille Date: Tue, 5 Mar 2024 14:21:04 +0100 Subject: [PATCH 3/5] Fix class name in available backends --- beaker/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beaker/cache.py b/beaker/cache.py index b58f7f52..61b8e4e9 100644 --- a/beaker/cache.py +++ b/beaker/cache.py @@ -128,7 +128,7 @@ def _init(self): 'ext:google': google.GoogleNamespaceManager, 'ext:mongodb': mongodb.MongoNamespaceManager, 'ext:redis': redisnm.RedisNamespaceManager, - 'ext:rediscluster': redisclusternm + 'ext:rediscluster': redisclusternm.RedisClusterNamespaceManager, }) From 0e8fce7e114534e3bb2458e600b3dd9116874c2a Mon Sep 17 00:00:00 2001 From: Millefeuille Date: Tue, 5 Mar 2024 14:22:25 +0100 Subject: [PATCH 4/5] Remove irrelevant docstring info --- beaker/ext/redisclusternm.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py index 9a15f2bf..e8055d3f 100644 --- a/beaker/ext/redisclusternm.py +++ b/beaker/ext/redisclusternm.py @@ -27,9 +27,7 @@ class RedisClusterNamespaceManager(NamespaceManager): Example: `redis://node-1:7001,redis://node-2:7002` The data will be stored into redis keys, with their name - starting with ``beaker_cache:``. So make sure you provide - a specific database number if you don't want to mix them - with your own data. + starting with ``beaker_cache:``. """ MAX_KEY_LENGTH = 1024 From caaa619e09d50784e82ad5f7741b6ed65d1338a7 Mon Sep 17 00:00:00 2001 From: Millefeuille Date: Tue, 5 Mar 2024 14:24:05 +0100 Subject: [PATCH 5/5] Fix RedisCluster path --- beaker/ext/redisclusternm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beaker/ext/redisclusternm.py b/beaker/ext/redisclusternm.py index e8055d3f..22ce1d75 100644 --- a/beaker/ext/redisclusternm.py +++ b/beaker/ext/redisclusternm.py @@ -54,7 +54,7 @@ def __init__(self, namespace, urls, timeout=None, **kw): port=url_options.get('port') )) self.client = RedisClusterNamespaceManager.clients.get( - urls, redis.RedisCluster, startup_nodes=self.nodes + urls, redis.cluster.RedisCluster, startup_nodes=self.nodes ) else: self.client = urls @@ -128,7 +128,7 @@ def __init__(self, identifier, urls): super(RedisClusterSynchronizer, self).__init__() self.identifier = 'beaker_lock:%s' % identifier if isinstance(urls, string_type): - self.client = RedisClusterNamespaceManager.clients.get(urls, redis.RedisCluster.from_url, urls) + self.client = RedisClusterNamespaceManager.clients.get(urls, redis.cluster.RedisCluster.from_url, urls) else: self.client = urls