Skip to content

Commit 36619a5

Browse files
authored
Add dynamic_startup_nodes parameter to async RedisCluster (#3646)
1 parent 0d28291 commit 36619a5

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

redis/asyncio/cluster.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
134134
| Enable read from replicas in READONLY mode and defines the load balancing
135135
strategy that will be used for cluster node selection.
136136
The data read from replicas is eventually consistent with the data in primary nodes.
137+
:param dynamic_startup_nodes:
138+
| Set the RedisCluster's startup nodes to all the discovered nodes.
139+
If true (default value), the cluster's discovered nodes will be used to
140+
determine the cluster nodes-slots mapping in the next topology refresh.
141+
It will remove the initial passed startup nodes if their endpoints aren't
142+
listed in the CLUSTER SLOTS output.
143+
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
144+
specific IP addresses, it is best to set it to false.
137145
:param reinitialize_steps:
138146
| Specifies the number of MOVED errors that need to occur before reinitializing
139147
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -250,6 +258,7 @@ def __init__(
250258
require_full_coverage: bool = True,
251259
read_from_replicas: bool = False,
252260
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
261+
dynamic_startup_nodes: bool = True,
253262
reinitialize_steps: int = 5,
254263
cluster_error_retry_attempts: int = 3,
255264
max_connections: int = 2**31,
@@ -388,6 +397,7 @@ def __init__(
388397
startup_nodes,
389398
require_full_coverage,
390399
kwargs,
400+
dynamic_startup_nodes=dynamic_startup_nodes,
391401
address_remap=address_remap,
392402
event_dispatcher=self._event_dispatcher,
393403
)
@@ -1162,6 +1172,7 @@ async def _mock(self, error: RedisError):
11621172

11631173
class NodesManager:
11641174
__slots__ = (
1175+
"_dynamic_startup_nodes",
11651176
"_moved_exception",
11661177
"_event_dispatcher",
11671178
"connection_kwargs",
@@ -1179,6 +1190,7 @@ def __init__(
11791190
startup_nodes: List["ClusterNode"],
11801191
require_full_coverage: bool,
11811192
connection_kwargs: Dict[str, Any],
1193+
dynamic_startup_nodes: bool = True,
11821194
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
11831195
event_dispatcher: Optional[EventDispatcher] = None,
11841196
) -> None:
@@ -1191,6 +1203,8 @@ def __init__(
11911203
self.nodes_cache: Dict[str, "ClusterNode"] = {}
11921204
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
11931205
self.read_load_balancer = LoadBalancer()
1206+
1207+
self._dynamic_startup_nodes: bool = dynamic_startup_nodes
11941208
self._moved_exception: MovedError = None
11951209
if event_dispatcher is None:
11961210
self._event_dispatcher = EventDispatcher()
@@ -1433,8 +1447,10 @@ async def initialize(self) -> None:
14331447
# Set the tmp variables to the real variables
14341448
self.slots_cache = tmp_slots
14351449
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
1436-
# Populate the startup nodes with all discovered nodes
1437-
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
1450+
1451+
if self._dynamic_startup_nodes:
1452+
# Populate the startup nodes with all discovered nodes
1453+
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
14381454

14391455
# Set the default node
14401456
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]

tests/test_asyncio/test_cluster.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2723,6 +2723,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
27232723
assert rc.get_node(host=default_host, port=7001) is not None
27242724
assert rc.get_node(host=default_host, port=7002) is not None
27252725

2726+
@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
2727+
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
2728+
rc = await get_mocked_redis_client(
2729+
host="my@DNS.com",
2730+
port=7000,
2731+
cluster_slots=default_cluster_slots,
2732+
dynamic_startup_nodes=dynamic_startup_nodes,
2733+
)
2734+
# Nodes are taken from default_cluster_slots
2735+
discovered_nodes = [
2736+
"127.0.0.1:7000",
2737+
"127.0.0.1:7001",
2738+
"127.0.0.1:7002",
2739+
"127.0.0.1:7003",
2740+
]
2741+
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
2742+
if dynamic_startup_nodes is True:
2743+
assert sorted(startup_nodes) == sorted(discovered_nodes)
2744+
else:
2745+
assert startup_nodes == ["my@DNS.com:7000"]
2746+
27262747

27272748
class TestClusterPipeline:
27282749
"""Tests for the ClusterPipeline class."""

0 commit comments

Comments
 (0)