Skip to content

Fixing sentinel command response #3191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import random
import weakref
from functools import reduce
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type

from redis.asyncio.client import Redis
Expand Down Expand Up @@ -223,19 +224,31 @@ async def execute_command(self, *args, **kwargs):
once - If set to True, then execute the resulting command on a single
node at random, rather than across the entire sentinel cluster.
"""
once = bool(kwargs.get("once", False))
if "once" in kwargs.keys():
kwargs.pop("once")
once = bool(kwargs.pop("once", False))

# Check if command is supposed to return the original
# responses instead of boolean value.
return_responses = bool(kwargs.pop("return_responses", False))

if once:
await random.choice(self.sentinels).execute_command(*args, **kwargs)
else:
tasks = [
asyncio.Task(sentinel.execute_command(*args, **kwargs))
for sentinel in self.sentinels
]
await asyncio.gather(*tasks)
return True
response = await random.choice(self.sentinels).execute_command(
*args, **kwargs
)
if return_responses:
return [response]
else:
return True if response else False

tasks = [
asyncio.Task(sentinel.execute_command(*args, **kwargs))
for sentinel in self.sentinels
]
responses = await asyncio.gather(*tasks)

if return_responses:
return responses

return bool(reduce(lambda x, y: x and y, responses))

def __repr__(self):
sentinel_addresses = []
Expand Down
31 changes: 26 additions & 5 deletions redis/commands/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,27 @@ def sentinel(self, *args):

def sentinel_get_master_addr_by_name(self, service_name):
"""Returns a (host, port) pair for the given ``service_name``"""
return self.execute_command("SENTINEL GET-MASTER-ADDR-BY-NAME", service_name)
return self.execute_command(
"SENTINEL GET-MASTER-ADDR-BY-NAME",
service_name,
once=True,
return_responses=True,
)

def sentinel_master(self, service_name):
"""Returns a dictionary containing the specified masters state."""
return self.execute_command("SENTINEL MASTER", service_name)
return self.execute_command(
"SENTINEL MASTER", service_name, return_responses=True
)

def sentinel_masters(self):
"""Returns a list of dictionaries containing each master's state."""
"""
Returns a list of dictionaries containing each master's state.

Important: This function is called by the Sentinel implementation and is
called directly on the Redis standalone client for sentinels,
so it doesn't support the "once" and "return_responses" options.
"""
return self.execute_command("SENTINEL MASTERS")

def sentinel_monitor(self, name, ip, port, quorum):
Expand All @@ -33,14 +46,22 @@ def sentinel_remove(self, name):

def sentinel_sentinels(self, service_name):
"""Returns a list of sentinels for ``service_name``"""
return self.execute_command("SENTINEL SENTINELS", service_name)
return self.execute_command(
"SENTINEL SENTINELS", service_name, return_responses=True
)

def sentinel_set(self, name, option, value):
"""Set Sentinel monitoring parameters for a given master"""
return self.execute_command("SENTINEL SET", name, option, value)

def sentinel_slaves(self, service_name):
"""Returns a list of slaves for ``service_name``"""
"""
Returns a list of slaves for ``service_name``

Important: This function is called by the Sentinel implementation and is
called directly on the Redis standalone client for sentinels,
so it doesn't support the "once" and "return_responses" options.
"""
return self.execute_command("SENTINEL SLAVES", service_name)

def sentinel_reset(self, pattern):
Expand Down
28 changes: 20 additions & 8 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import random
import weakref
from functools import reduce
from typing import Optional

from redis.client import Redis
Expand Down Expand Up @@ -254,16 +255,27 @@ def execute_command(self, *args, **kwargs):
once - If set to True, then execute the resulting command on a single
node at random, rather than across the entire sentinel cluster.
"""
once = bool(kwargs.get("once", False))
if "once" in kwargs.keys():
kwargs.pop("once")
once = bool(kwargs.pop("once", False))

# Check if command is supposed to return the original
# responses instead of boolean value.
return_responses = bool(kwargs.pop("return_responses", False))

if once:
random.choice(self.sentinels).execute_command(*args, **kwargs)
else:
for sentinel in self.sentinels:
sentinel.execute_command(*args, **kwargs)
return True
response = random.choice(self.sentinels).execute_command(*args, **kwargs)
if return_responses:
return [response]
else:
return True if response else False

responses = []
for sentinel in self.sentinels:
responses.append(sentinel.execute_command(*args, **kwargs))

if return_responses:
return responses

return bool(reduce(lambda x, y: x and y, responses))

def __repr__(self):
sentinel_addresses = []
Expand Down
76 changes: 71 additions & 5 deletions tests/test_asyncio/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,35 @@ def sentinel(request, cluster):
return Sentinel([("foo", 26379), ("bar", 26379)])


@pytest.fixture()
async def deployed_sentinel(request):
sentinel_ips = request.config.getoption("--sentinels")
sentinel_endpoints = [
(ip.strip(), int(port.strip()))
for ip, port in (endpoint.split(":") for endpoint in sentinel_ips.split(","))
]
kwargs = {}
decode_responses = True

sentinel_kwargs = {"decode_responses": decode_responses}
force_master_ip = "localhost"

protocol = request.config.getoption("--protocol", 2)

sentinel = Sentinel(
sentinel_endpoints,
force_master_ip=force_master_ip,
sentinel_kwargs=sentinel_kwargs,
socket_timeout=0.1,
protocol=protocol,
decode_responses=decode_responses,
**kwargs,
)
yield sentinel
for s in sentinel.sentinels:
await s.close()


@pytest.mark.onlynoncluster
async def test_discover_master(sentinel, master_ip):
address = await sentinel.discover_master("mymaster")
Expand Down Expand Up @@ -226,19 +255,22 @@ async def test_slave_round_robin(cluster, sentinel, master_ip):


@pytest.mark.onlynoncluster
async def test_ckquorum(cluster, sentinel):
assert await sentinel.sentinel_ckquorum("mymaster")
async def test_ckquorum(sentinel):
resp = await sentinel.sentinel_ckquorum("mymaster")
assert resp is True


@pytest.mark.onlynoncluster
async def test_flushconfig(cluster, sentinel):
assert await sentinel.sentinel_flushconfig()
async def test_flushconfig(sentinel):
resp = await sentinel.sentinel_flushconfig()
assert resp is True


@pytest.mark.onlynoncluster
async def test_reset(cluster, sentinel):
cluster.master["is_odown"] = True
assert await sentinel.sentinel_reset("mymaster")
resp = await sentinel.sentinel_reset("mymaster")
assert resp is True


@pytest.mark.onlynoncluster
Expand Down Expand Up @@ -284,3 +316,37 @@ async def test_repr_correctly_represents_connection_object(sentinel):
str(connection)
== "<redis.asyncio.sentinel.SentinelManagedConnection,host=127.0.0.1,port=6379)>" # noqa: E501
)


# Tests against real sentinel instances
@pytest.mark.onlynoncluster
async def test_get_sentinels(deployed_sentinel):
resps = await deployed_sentinel.sentinel_sentinels("redis-py-test")

# validate that the original command response is returned
assert isinstance(resps, list)

# validate that the command has been executed against all sentinels
# each response from each sentinel is returned
assert len(resps) > 1


@pytest.mark.onlynoncluster
async def test_get_master_addr_by_name(deployed_sentinel):
resps = await deployed_sentinel.sentinel_get_master_addr_by_name("redis-py-test")

# validate that the original command response is returned
assert isinstance(resps, list)

# validate that the command has been executed just once
# when executed once, only one response element is returned
assert len(resps) == 1

assert isinstance(resps[0], tuple)


@pytest.mark.onlynoncluster
async def test_redis_master_usage(deployed_sentinel):
r = await deployed_sentinel.master_for("redis-py-test", db=0)
await r.set("foo", "bar")
assert (await r.get("foo")) == "bar"
78 changes: 72 additions & 6 deletions tests/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,35 @@ def sentinel(request, cluster):
return Sentinel([("foo", 26379), ("bar", 26379)])


@pytest.fixture()
def deployed_sentinel(request):
sentinel_ips = request.config.getoption("--sentinels")
sentinel_endpoints = [
(ip.strip(), int(port.strip()))
for ip, port in (endpoint.split(":") for endpoint in sentinel_ips.split(","))
]
kwargs = {}
decode_responses = True

sentinel_kwargs = {"decode_responses": decode_responses}
force_master_ip = "localhost"

protocol = request.config.getoption("--protocol", 2)

sentinel = Sentinel(
sentinel_endpoints,
force_master_ip=force_master_ip,
sentinel_kwargs=sentinel_kwargs,
socket_timeout=0.1,
protocol=protocol,
decode_responses=decode_responses,
**kwargs,
)
yield sentinel
for s in sentinel.sentinels:
s.close()


@pytest.mark.onlynoncluster
def test_discover_master(sentinel, master_ip):
address = sentinel.discover_master("mymaster")
Expand Down Expand Up @@ -184,7 +213,7 @@ def test_discover_slaves(cluster, sentinel):


@pytest.mark.onlynoncluster
def test_master_for(cluster, sentinel, master_ip):
def test_master_for(sentinel, master_ip):
master = sentinel.master_for("mymaster", db=9)
assert master.ping()
assert master.connection_pool.master_address == (master_ip, 6379)
Expand Down Expand Up @@ -228,19 +257,22 @@ def test_slave_round_robin(cluster, sentinel, master_ip):


@pytest.mark.onlynoncluster
def test_ckquorum(cluster, sentinel):
assert sentinel.sentinel_ckquorum("mymaster")
def test_ckquorum(sentinel):
resp = sentinel.sentinel_ckquorum("mymaster")
assert resp is True


@pytest.mark.onlynoncluster
def test_flushconfig(cluster, sentinel):
assert sentinel.sentinel_flushconfig()
def test_flushconfig(sentinel):
resp = sentinel.sentinel_flushconfig()
assert resp is True


@pytest.mark.onlynoncluster
def test_reset(cluster, sentinel):
cluster.master["is_odown"] = True
assert sentinel.sentinel_reset("mymaster")
resp = sentinel.sentinel_reset("mymaster")
assert resp is True


@pytest.mark.onlynoncluster
Expand All @@ -266,3 +298,37 @@ def mock_disconnect():

assert calls == 1
pool.disconnect()


# Tests against real sentinel instances
@pytest.mark.onlynoncluster
def test_get_sentinels(deployed_sentinel):
resps = deployed_sentinel.sentinel_sentinels("redis-py-test")

# validate that the original command response is returned
assert isinstance(resps, list)

# validate that the command has been executed against all sentinels
# each response from each sentinel is returned
assert len(resps) > 1


@pytest.mark.onlynoncluster
def test_get_master_addr_by_name(deployed_sentinel):
resps = deployed_sentinel.sentinel_get_master_addr_by_name("redis-py-test")

# validate that the original command response is returned
assert isinstance(resps, list)

# validate that the command has been executed just once
# when executed once, only one response element is returned
assert len(resps) == 1

assert isinstance(resps[0], tuple)


@pytest.mark.onlynoncluster
def test_redis_master_usage(deployed_sentinel):
r = deployed_sentinel.master_for("redis-py-test", db=0)
r.set("foo", "bar")
assert r.get("foo") == "bar"