Skip to content

Commit 72f90c2

Browse files
dvora-hpedro.frazaoGauthier Imbertchayimszumka
authored
Cherry-pick for 4.3.5 (#2468)
Co-authored-by: pedro.frazao <perl.pf@netcf.org> Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> Co-authored-by: Gauthier Imbert <gauthier@PC17> Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: szumka <106675199+szumka@users.noreply.github.com> Co-authored-by: Mehdi ABAAKOUK <sileht@sileht.net> Co-authored-by: Tim Gates <tim.gates@iress.com> Co-authored-by: Utkarsh Gupta <utkarshgupta137@gmail.com> Co-authored-by: Nial Daly <34862917+nialdaly@users.noreply.github.com> Co-authored-by: pedrofrazao <603718+pedrofrazao@users.noreply.github.com> Co-authored-by: Антон Безденежных <gamer392@yandex.ru> Co-authored-by: Iglesys <g.imbert34@gmail.com> Co-authored-by: Kristján Valur Jónsson <sweskman@gmail.com> Co-authored-by: DvirDukhan <dvir@redis.com> Co-authored-by: Alibi Shalgymbay <a.shalgymbay@mycar.kz> Co-authored-by: dvora-h <dvora.heller@redis.com> Co-authored-by: Alibi <aliby.bbb@gmail.com> Co-authored-by: Aarni Koskela <akx@iki.fi>
1 parent e6cd4fd commit 72f90c2

35 files changed

+2461
-921
lines changed

CHANGES

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
* Added dynaminc_startup_nodes configuration to RedisCluster
1515
* Fix reusing the old nodes' connections when cluster topology refresh is being done
1616
* Fix RedisCluster to immediately raise AuthenticationError without a retry
17+
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
18+
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
19+
* The `deprecated` library is no longer a dependency
20+
1721
* 4.1.3 (Feb 8, 2022)
1822
* Fix flushdb and flushall (#1926)
1923
* Add redis5 and redis4 dockers (#1871)

redis/asyncio/cluster.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,6 @@ class ClusterNode:
755755
"""
756756

757757
__slots__ = (
758-
"_command_stack",
759758
"_connections",
760759
"_free",
761760
"connection_class",
@@ -796,7 +795,6 @@ def __init__(
796795

797796
self._connections: List[Connection] = []
798797
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
799-
self._command_stack: List["PipelineCommand"] = []
800798

801799
def __repr__(self) -> str:
802800
return (
@@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
887885
# Release connection
888886
self._free.append(connection)
889887

890-
async def execute_pipeline(self) -> bool:
888+
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
891889
# Acquire connection
892890
connection = self.acquire_connection()
893891

894892
# Execute command
895893
await connection.send_packed_command(
896-
connection.pack_commands(cmd.args for cmd in self._command_stack), False
894+
connection.pack_commands(cmd.args for cmd in commands), False
897895
)
898896

899897
# Read responses
900898
ret = False
901-
for cmd in self._command_stack:
899+
for cmd in commands:
902900
try:
903901
cmd.result = await self.parse_response(
904902
connection, cmd.args[0], **cmd.kwargs
@@ -1365,12 +1363,14 @@ async def _execute(
13651363

13661364
node = target_nodes[0]
13671365
if node.name not in nodes:
1368-
nodes[node.name] = node
1369-
node._command_stack = []
1370-
node._command_stack.append(cmd)
1366+
nodes[node.name] = (node, [])
1367+
nodes[node.name][1].append(cmd)
13711368

13721369
errors = await asyncio.gather(
1373-
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
1370+
*(
1371+
asyncio.ensure_future(node[0].execute_pipeline(node[1]))
1372+
for node in nodes.values()
1373+
)
13741374
)
13751375

13761376
if any(errors):

redis/asyncio/connection.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ def __del__(self):
685685

686686
@property
687687
def is_connected(self):
688-
return self._reader and self._writer
688+
return self._reader is not None and self._writer is not None
689689

690690
def register_connect_callback(self, callback):
691691
self._connect_callbacks.append(weakref.WeakMethod(callback))
@@ -767,7 +767,16 @@ async def _connect(self):
767767
def _error_message(self, exception):
768768
# args for socket.error can either be (errno, "message")
769769
# or just "message"
770-
if len(exception.args) == 1:
770+
if not exception.args:
771+
# asyncio has a bug where on Connection reset by peer, the
772+
# exception is not instanciated, so args is empty. This is the
773+
# workaround.
774+
# See: https://github.com/redis/redis-py/issues/2237
775+
# See: https://github.com/python/cpython/issues/94061
776+
return (
777+
f"Error connecting to {self.host}:{self.port}. Connection reset by peer"
778+
)
779+
elif len(exception.args) == 1:
771780
return f"Error connecting to {self.host}:{self.port}. {exception.args[0]}."
772781
else:
773782
return (

redis/commands/bf/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,16 @@ def __init__(self, client, **kwargs):
165165
# TDIGEST_RESET: bool_ok,
166166
# TDIGEST_ADD: spaceHolder,
167167
# TDIGEST_MERGE: spaceHolder,
168-
TDIGEST_CDF: float,
169-
TDIGEST_QUANTILE: float,
168+
TDIGEST_CDF: parse_to_list,
169+
TDIGEST_QUANTILE: parse_to_list,
170170
TDIGEST_MIN: float,
171171
TDIGEST_MAX: float,
172+
TDIGEST_TRIMMED_MEAN: float,
172173
TDIGEST_INFO: TDigestInfo,
174+
TDIGEST_RANK: parse_to_list,
175+
TDIGEST_REVRANK: parse_to_list,
176+
TDIGEST_BYRANK: parse_to_list,
177+
TDIGEST_BYREVRANK: parse_to_list,
173178
}
174179

175180
self.client = client

redis/commands/bf/commands.py

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from redis.client import NEVER_DECODE
22
from redis.exceptions import ModuleError
3-
from redis.utils import HIREDIS_AVAILABLE
3+
from redis.utils import HIREDIS_AVAILABLE, deprecated_function
44

55
BF_RESERVE = "BF.RESERVE"
66
BF_ADD = "BF.ADD"
@@ -49,6 +49,11 @@
4949
TDIGEST_MIN = "TDIGEST.MIN"
5050
TDIGEST_MAX = "TDIGEST.MAX"
5151
TDIGEST_INFO = "TDIGEST.INFO"
52+
TDIGEST_TRIMMED_MEAN = "TDIGEST.TRIMMED_MEAN"
53+
TDIGEST_RANK = "TDIGEST.RANK"
54+
TDIGEST_REVRANK = "TDIGEST.REVRANK"
55+
TDIGEST_BYRANK = "TDIGEST.BYRANK"
56+
TDIGEST_BYREVRANK = "TDIGEST.BYREVRANK"
5257

5358

5459
class BFCommands:
@@ -67,6 +72,8 @@ def create(self, key, errorRate, capacity, expansion=None, noScale=None):
6772
self.append_no_scale(params, noScale)
6873
return self.execute_command(BF_RESERVE, *params)
6974

75+
reserve = create
76+
7077
def add(self, key, item):
7178
"""
7279
Add to a Bloom Filter `key` an `item`.
@@ -176,6 +183,8 @@ def create(
176183
self.append_max_iterations(params, max_iterations)
177184
return self.execute_command(CF_RESERVE, *params)
178185

186+
reserve = create
187+
179188
def add(self, key, item):
180189
"""
181190
Add an `item` to a Cuckoo Filter `key`.
@@ -316,6 +325,7 @@ def query(self, key, *items):
316325
""" # noqa
317326
return self.execute_command(TOPK_QUERY, key, *items)
318327

328+
@deprecated_function(version="4.4.0", reason="deprecated since redisbloom 2.4.0")
319329
def count(self, key, *items):
320330
"""
321331
Return count for one `item` or more from `key`.
@@ -344,12 +354,12 @@ def info(self, key):
344354

345355

346356
class TDigestCommands:
347-
def create(self, key, compression):
357+
def create(self, key, compression=100):
348358
"""
349359
Allocate the memory and initialize the t-digest.
350360
For more information see `TDIGEST.CREATE <https://redis.io/commands/tdigest.create>`_.
351361
""" # noqa
352-
return self.execute_command(TDIGEST_CREATE, key, compression)
362+
return self.execute_command(TDIGEST_CREATE, key, "COMPRESSION", compression)
353363

354364
def reset(self, key):
355365
"""
@@ -358,26 +368,30 @@ def reset(self, key):
358368
""" # noqa
359369
return self.execute_command(TDIGEST_RESET, key)
360370

361-
def add(self, key, values, weights):
371+
def add(self, key, values):
362372
"""
363-
Add one or more samples (value with weight) to a sketch `key`.
364-
Both `values` and `weights` are lists.
365-
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
373+
Adds one or more observations to a t-digest sketch `key`.
366374
367-
Example:
368-
369-
>>> tdigestadd('A', [1500.0], [1.0])
375+
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
370376
""" # noqa
371-
params = [key]
372-
self.append_values_and_weights(params, values, weights)
373-
return self.execute_command(TDIGEST_ADD, *params)
377+
return self.execute_command(TDIGEST_ADD, key, *values)
374378

375-
def merge(self, toKey, fromKey):
379+
def merge(self, destination_key, num_keys, *keys, compression=None, override=False):
376380
"""
377-
Merge all of the values from 'fromKey' to 'toKey' sketch.
381+
Merges all of the values from `keys` to 'destination-key' sketch.
382+
It is mandatory to provide the `num_keys` before passing the input keys and
383+
the other (optional) arguments.
384+
If `destination_key` already exists its values are merged with the input keys.
385+
If you wish to override the destination key contents use the `OVERRIDE` parameter.
386+
378387
For more information see `TDIGEST.MERGE <https://redis.io/commands/tdigest.merge>`_.
379388
""" # noqa
380-
return self.execute_command(TDIGEST_MERGE, toKey, fromKey)
389+
params = [destination_key, num_keys, *keys]
390+
if compression is not None:
391+
params.extend(["COMPRESSION", compression])
392+
if override:
393+
params.append("OVERRIDE")
394+
return self.execute_command(TDIGEST_MERGE, *params)
381395

382396
def min(self, key):
383397
"""
@@ -393,20 +407,21 @@ def max(self, key):
393407
""" # noqa
394408
return self.execute_command(TDIGEST_MAX, key)
395409

396-
def quantile(self, key, quantile):
410+
def quantile(self, key, quantile, *quantiles):
397411
"""
398-
Return double value estimate of the cutoff such that a specified fraction of the data
399-
added to this TDigest would be less than or equal to the cutoff.
412+
Returns estimates of one or more cutoffs such that a specified fraction of the
413+
observations added to this t-digest would be less than or equal to each of the
414+
specified cutoffs. (Multiple quantiles can be returned with one call)
400415
For more information see `TDIGEST.QUANTILE <https://redis.io/commands/tdigest.quantile>`_.
401416
""" # noqa
402-
return self.execute_command(TDIGEST_QUANTILE, key, quantile)
417+
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)
403418

404-
def cdf(self, key, value):
419+
def cdf(self, key, value, *values):
405420
"""
406421
Return double fraction of all points added which are <= value.
407422
For more information see `TDIGEST.CDF <https://redis.io/commands/tdigest.cdf>`_.
408423
""" # noqa
409-
return self.execute_command(TDIGEST_CDF, key, value)
424+
return self.execute_command(TDIGEST_CDF, key, value, *values)
410425

411426
def info(self, key):
412427
"""
@@ -416,6 +431,50 @@ def info(self, key):
416431
""" # noqa
417432
return self.execute_command(TDIGEST_INFO, key)
418433

434+
def trimmed_mean(self, key, low_cut_quantile, high_cut_quantile):
435+
"""
436+
Return mean value from the sketch, excluding observation values outside
437+
the low and high cutoff quantiles.
438+
For more information see `TDIGEST.TRIMMED_MEAN <https://redis.io/commands/tdigest.trimmed_mean>`_.
439+
""" # noqa
440+
return self.execute_command(
441+
TDIGEST_TRIMMED_MEAN, key, low_cut_quantile, high_cut_quantile
442+
)
443+
444+
def rank(self, key, value, *values):
445+
"""
446+
Retrieve the estimated rank of value (the number of observations in the sketch
447+
that are smaller than value + half the number of observations that are equal to value).
448+
449+
For more information see `TDIGEST.RANK <https://redis.io/commands/tdigest.rank>`_.
450+
""" # noqa
451+
return self.execute_command(TDIGEST_RANK, key, value, *values)
452+
453+
def revrank(self, key, value, *values):
454+
"""
455+
Retrieve the estimated rank of value (the number of observations in the sketch
456+
that are larger than value + half the number of observations that are equal to value).
457+
458+
For more information see `TDIGEST.REVRANK <https://redis.io/commands/tdigest.revrank>`_.
459+
""" # noqa
460+
return self.execute_command(TDIGEST_REVRANK, key, value, *values)
461+
462+
def byrank(self, key, rank, *ranks):
463+
"""
464+
Retrieve an estimation of the value with the given rank.
465+
466+
For more information see `TDIGEST.BY_RANK <https://redis.io/commands/tdigest.by_rank>`_.
467+
""" # noqa
468+
return self.execute_command(TDIGEST_BYRANK, key, rank, *ranks)
469+
470+
def byrevrank(self, key, rank, *ranks):
471+
"""
472+
Retrieve an estimation of the value with the given reverse rank.
473+
474+
For more information see `TDIGEST.BY_REVRANK <https://redis.io/commands/tdigest.by_revrank>`_.
475+
""" # noqa
476+
return self.execute_command(TDIGEST_BYREVRANK, key, rank, *ranks)
477+
419478

420479
class CMSCommands:
421480
"""Count-Min Sketch Commands"""

redis/commands/bf/info.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,20 @@ def __init__(self, args):
6868
class TDigestInfo(object):
6969
compression = None
7070
capacity = None
71-
mergedNodes = None
72-
unmergedNodes = None
73-
mergedWeight = None
74-
unmergedWeight = None
75-
totalCompressions = None
71+
merged_nodes = None
72+
unmerged_nodes = None
73+
merged_weight = None
74+
unmerged_weight = None
75+
total_compressions = None
76+
memory_usage = None
7677

7778
def __init__(self, args):
7879
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
7980
self.compression = response["Compression"]
8081
self.capacity = response["Capacity"]
81-
self.mergedNodes = response["Merged nodes"]
82-
self.unmergedNodes = response["Unmerged nodes"]
83-
self.mergedWeight = response["Merged weight"]
84-
self.unmergedWeight = response["Unmerged weight"]
85-
self.totalCompressions = response["Total compressions"]
82+
self.merged_nodes = response["Merged nodes"]
83+
self.unmerged_nodes = response["Unmerged nodes"]
84+
self.merged_weight = response["Merged weight"]
85+
self.unmerged_weight = response["Unmerged weight"]
86+
self.total_compressions = response["Total compressions"]
87+
self.memory_usage = response["Memory usage"]

redis/commands/bf/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
def parse_tdigest_quantile(response):
2+
"""Parse TDIGEST.QUANTILE response."""
3+
return [float(x) for x in response]

redis/commands/cluster.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,25 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
316316
# Sum up the reply from each command
317317
return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
318318

319+
async def _execute_pipeline_by_slot(
320+
self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
321+
) -> List[Any]:
322+
if self._initialize:
323+
await self.initialize()
324+
read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
325+
pipe = self.pipeline()
326+
[
327+
pipe.execute_command(
328+
command,
329+
*slot_args,
330+
target_nodes=[
331+
self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
332+
],
333+
)
334+
for slot, slot_args in slots_to_args.items()
335+
]
336+
return await pipe.execute()
337+
319338

320339
class ClusterManagementCommands(ManagementCommands):
321340
"""

0 commit comments

Comments
 (0)