diff --git a/nydus/__init__.py b/nydus/__init__.py index 3c6deb4..3eb26d4 100644 --- a/nydus/__init__.py +++ b/nydus/__init__.py @@ -11,3 +11,6 @@ .get_distribution('nydus').version except Exception, e: VERSION = 'unknown' + +#Just make sure we don't clash with the source project +VERSION = '10.0.1' \ No newline at end of file diff --git a/nydus/db/backends/base.py b/nydus/db/backends/base.py index 34760ab..79dc858 100644 --- a/nydus/db/backends/base.py +++ b/nydus/db/backends/base.py @@ -44,6 +44,7 @@ class BaseConnection(object): def __init__(self, num, **options): self._connection = None self.num = num + self.options = options @property def identifier(self): diff --git a/nydus/db/backends/redis.py b/nydus/db/backends/redis.py index 98237c1..d0f8fef 100644 --- a/nydus/db/backends/redis.py +++ b/nydus/db/backends/redis.py @@ -15,6 +15,8 @@ class RedisPipeline(BasePipeline): + silent_exceptions = frozenset([RedisError]) + def __init__(self, connection): self.pending = [] self.connection = connection @@ -25,7 +27,17 @@ def add(self, command): getattr(self.pipe, command._attr)(*command._args, **command._kwargs) def execute(self): - return self.pipe.execute() + fail_silently = self.connection.fail_silently + try: + results = self.pipe.execute() + except tuple(self.silent_exceptions), e: + if fail_silently: + results = [] + else: + raise + + return results + class Redis(BaseConnection): @@ -39,6 +51,7 @@ def __init__(self, host='localhost', port=6379, db=0, timeout=None, password=Non self.db = db self.timeout = timeout self.__password = password + self.fail_silently = options.get('fail_silently', False) super(Redis, self).__init__(**options) @property diff --git a/nydus/db/base.py b/nydus/db/base.py index c8fb857..a37ef1f 100644 --- a/nydus/db/base.py +++ b/nydus/db/base.py @@ -9,7 +9,8 @@ from collections import defaultdict from nydus.db.routers import BaseRouter from nydus.utils import import_string, ThreadPool - +import logging +logger = logging.getLogger(__name__) def create_cluster(settings): """ @@ -79,19 +80,37 @@ def __iter__(self): def _execute(self, attr, args, kwargs): connections = self._connections_for(attr, *args, **kwargs) - results = [] for conn in connections: + fail_silently = conn.fail_silently + for retry in xrange(self.max_connection_retries): try: results.append(getattr(conn, attr)(*args, **kwargs)) except tuple(conn.retryable_exceptions), e: + #retry, raise an error or fail silently + error = None if not self.router.retryable: - raise e + error = e elif retry == self.max_connection_retries - 1: - raise self.MaxRetriesExceededError(e) + error = self.MaxRetriesExceededError(e) else: conn = self._connections_for(attr, retry_for=conn.num, *args, **kwargs)[0] + + if error and fail_silently: + #fail silently by returning None, usefull for cache like usage of redis + if self.router.retryable: + logger.error('failing silently after %s retries for conn %s with command %s', self.max_connection_retries, conn, attr) + else: + logger.error('failing silently for conn %s with command %s', conn, attr) + results = [None] + break + else: + raise error + + #going for another retry + logger.warn('retrying connection %s with command %s', conn, attr) + else: break diff --git a/nydus/db/routers/__init__.py b/nydus/db/routers/__init__.py index a33f475..67ee82d 100644 --- a/nydus/db/routers/__init__.py +++ b/nydus/db/routers/__init__.py @@ -8,3 +8,4 @@ from .base import BaseRouter, RoundRobinRouter +from .prefix_partition import PrefixPartitionRouter \ No newline at end of file diff --git a/nydus/db/routers/prefix_partition.py b/nydus/db/routers/prefix_partition.py new file mode 100644 index 0000000..6a2edb9 --- /dev/null +++ b/nydus/db/routers/prefix_partition.py @@ -0,0 +1,54 @@ +from nydus.db.routers import BaseRouter + +class PrefixPartitionRouter(BaseRouter): + ''' + Routes based on the configured prefixes + + Example config: + + 'redis': { + 'engine': 'nydus.db.backends.redis.Redis', + 'router': 'nydus.db.routers.redis.PrefixPartitionRouter', + 'hosts': { + 'default': {'db': 0, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'user:loves:': {'db': 1, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'loves:': {'db': 2, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'hash:entity:': {'db': 0, 'host': 'entities.redis.goteam.be', 'port': 6379}, + } + } + + We route to one and only one redis. + Use a seperate config if you want hashing based partitioning. + ''' + + def _pre_routing(self, cluster, attr, key, *args, **kwargs): + """ + Requesting a pipeline without a key to partition on is just plain wrong. + We raise a valueError if you try + """ + if not key and attr == 'pipeline': + raise ValueError('Pipelines requires a key for proper routing') + return key + + def _route(self, cluster, attr, key, *args, **kwargs): + """ + Perform routing and return db_nums + """ + if 'default' not in cluster.hosts: + error_message = 'The prefix router requires a default host' + raise ValueError(error_message) + hosts = None + if key: + for host in cluster.hosts: + if key.startswith(str(host)): + hosts = [host] + if not hosts: + hosts = ['default'] + + #sanity check, dont see how this can happen + if not hosts: + error_message = 'The prefix partition router couldnt find a host for command %s and key %s' % (attr, key) + raise ValueError(error_message) + + return hosts + diff --git a/nydus/db/routers/redis.py b/nydus/db/routers/redis.py index 6b44330..41a2855 100644 --- a/nydus/db/routers/redis.py +++ b/nydus/db/routers/redis.py @@ -6,8 +6,8 @@ :license: Apache License 2.0, see LICENSE for more details. """ -from nydus.db.routers import RoundRobinRouter +from nydus.db.routers import RoundRobinRouter, PrefixPartitionRouter from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter -__all__ = ('ConsistentHashingRouter', 'PartitionRouter', 'RoundRobinRouter') +__all__ = ('PrefixPartitionRouter', 'ConsistentHashingRouter', 'PartitionRouter', 'RoundRobinRouter') diff --git a/tests/nydus/db/connections/tests.py b/tests/nydus/db/connections/tests.py index 311bcb9..8bb3266 100644 --- a/tests/nydus/db/connections/tests.py +++ b/tests/nydus/db/connections/tests.py @@ -32,6 +32,54 @@ def get_dbs(self, cluster, attr, key=None, *args, **kwargs): return [0] + +class BrokenRedisTest(BaseTest): + def setUp(self): + from nydus.db import create_cluster + engine = 'nydus.db.backends.redis.Redis' + router = 'nydus.db.routers.redis.PrefixPartitionRouter' + nydus_config = dict(engine=engine, router=router, hosts={ + 'default': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False}, + 'simple_cache': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': True}, + 'app_critical': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False}, + }) + redis = create_cluster(nydus_config) + self.redis = redis + + def test_broken_redis(self): + #test silent failures + key = 'simple_cache:test' + set_result = self.redis.set(key, '1') + assert not set_result + result = self.redis.get(key) + assert not result + + #assert by default we fail loudly + from redis.exceptions import ConnectionError + try: + key = 'app_critical:test' + set_result = self.redis.set(key, '1') + result = self.redis.get(key) + except ConnectionError, e: + pass + else: + raise Exception, 'we were hoping for a connection error' + + def test_map(self): + keys = ['simple_cache:test', 'simple_cache:test_two', 'app_critical:test'] + with self.redis.map() as conn: + results = [conn.get(k) for k in keys] + + for result, key in zip(results, keys): + result_object = result._wrapped + if 'app_critical' in key: + assert 'Error' in result_object + else: + assert result_object is None, 'we should get None when failing' + + + + class ClusterTest(BaseTest): def test_create_cluster(self): c = create_cluster({ diff --git a/tests/nydus/db/routers/tests.py b/tests/nydus/db/routers/tests.py index 0a822a6..52ab8a1 100644 --- a/tests/nydus/db/routers/tests.py +++ b/tests/nydus/db/routers/tests.py @@ -14,6 +14,63 @@ from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter +class PrefixPartitionTest(BaseTest): + def setUp(self): + from nydus.db import create_cluster + engine = 'nydus.db.backends.redis.Redis' + router = 'nydus.db.routers.redis.PrefixPartitionRouter' + nydus_config = dict(engine=engine, router=router, hosts={ + 'default': {'db': 0, 'host': 'localhost', 'port': 6379}, + 'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379} + }) + redis = create_cluster(nydus_config) + self.redis = redis + + def test_partitions(self): + ''' + Verify if we write ton one and only one redis database + ''' + import mock + + keys = [ + ('user:loves:test', 1), + ('default_test',0), + ('hash:entity:test', 0) + ] + + for key, redis_db in keys: + with mock.patch('redis.client.StrictRedis.execute_command') as fake_set: + result = self.redis.set(key, '1') + args, kwargs = fake_set.call_args + instance, cmd, key, key_value = args + connection_kwargs = instance.connection_pool.connection_kwargs + db = connection_kwargs['db'] + self.assertEqual(db, redis_db) + + def test_missing_default(self): + from nydus.db import create_cluster + from functools import partial + + engine = 'nydus.db.backends.redis.Redis' + router = 'nydus.db.routers.redis.PrefixPartitionRouter' + nydus_config = dict(engine=engine, router=router, hosts={ + 'base': {'db': 0, 'host': 'localhost', 'port': 6379}, + 'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379} + }) + redis = create_cluster(nydus_config) + + redis_call = partial(redis.get, 'thiswillbreak') + self.assertRaises(ValueError, redis_call) + + def test_pipeline(self): + redis = self.redis + #we prefer map above direct pipeline usage, but if you really need it: + redis.pipeline('default:test') + + #this should fail as we require a key + self.assertRaises(ValueError, redis.pipeline) + + class DummyConnection(BaseConnection): def __init__(self, i): self.host = 'dummyhost'