Skip to content

Commit f5827c7

Browse files
authored
Merge branch 'master' into ps_revert_changing_of_default_value_for_asyncio_redis_ssl_context
2 parents 0fd3693 + c0a0492 commit f5827c7

18 files changed

+174
-72
lines changed

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ jobs:
125125
redis-version: [ '${{ needs.redis_version.outputs.CURRENT }}' ]
126126
python-version: [ '3.8', '3.13']
127127
parser-backend: [ 'hiredis' ]
128-
hiredis-version: [ '>=3.0.0', '<3.0.0' ]
128+
hiredis-version: [ '>=3.2.0', '<3.0.0' ]
129129
event-loop: [ 'asyncio' ]
130130
env:
131131
ACTIONS_ALLOW_UNSECURE_COMMANDS: true

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ dependencies = ['async-timeout>=4.0.3; python_full_version<"3.11.3"']
3333

3434
[project.optional-dependencies]
3535
hiredis = [
36-
"hiredis>=3.0.0",
36+
"hiredis>=3.2.0",
3737
]
3838
ocsp = [
3939
"cryptography>=36.0.1",

redis/_parsers/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
from .base import BaseParser, _AsyncRESPBase
1+
from .base import (
2+
AsyncPushNotificationsParser,
3+
BaseParser,
4+
PushNotificationsParser,
5+
_AsyncRESPBase,
6+
)
27
from .commands import AsyncCommandsParser, CommandsParser
38
from .encoders import Encoder
49
from .hiredis import _AsyncHiredisParser, _HiredisParser
@@ -11,10 +16,12 @@
1116
"_AsyncRESPBase",
1217
"_AsyncRESP2Parser",
1318
"_AsyncRESP3Parser",
19+
"AsyncPushNotificationsParser",
1420
"CommandsParser",
1521
"Encoder",
1622
"BaseParser",
1723
"_HiredisParser",
1824
"_RESP2Parser",
1925
"_RESP3Parser",
26+
"PushNotificationsParser",
2027
]

redis/_parsers/base.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import sys
22
from abc import ABC
33
from asyncio import IncompleteReadError, StreamReader, TimeoutError
4-
from typing import List, Optional, Union
4+
from typing import Callable, List, Optional, Protocol, Union
55

66
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
77
from asyncio import timeout as async_timeout
@@ -158,6 +158,58 @@ async def read_response(
158158
raise NotImplementedError()
159159

160160

161+
_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"]
162+
163+
164+
class PushNotificationsParser(Protocol):
165+
"""Protocol defining RESP3-specific parsing functionality"""
166+
167+
pubsub_push_handler_func: Callable
168+
invalidation_push_handler_func: Optional[Callable] = None
169+
170+
def handle_pubsub_push_response(self, response):
171+
"""Handle pubsub push responses"""
172+
raise NotImplementedError()
173+
174+
def handle_push_response(self, response, **kwargs):
175+
if response[0] not in _INVALIDATION_MESSAGE:
176+
return self.pubsub_push_handler_func(response)
177+
if self.invalidation_push_handler_func:
178+
return self.invalidation_push_handler_func(response)
179+
180+
def set_pubsub_push_handler(self, pubsub_push_handler_func):
181+
self.pubsub_push_handler_func = pubsub_push_handler_func
182+
183+
def set_invalidation_push_handler(self, invalidation_push_handler_func):
184+
self.invalidation_push_handler_func = invalidation_push_handler_func
185+
186+
187+
class AsyncPushNotificationsParser(Protocol):
188+
"""Protocol defining async RESP3-specific parsing functionality"""
189+
190+
pubsub_push_handler_func: Callable
191+
invalidation_push_handler_func: Optional[Callable] = None
192+
193+
async def handle_pubsub_push_response(self, response):
194+
"""Handle pubsub push responses asynchronously"""
195+
raise NotImplementedError()
196+
197+
async def handle_push_response(self, response, **kwargs):
198+
"""Handle push responses asynchronously"""
199+
if response[0] not in _INVALIDATION_MESSAGE:
200+
return await self.pubsub_push_handler_func(response)
201+
if self.invalidation_push_handler_func:
202+
return await self.invalidation_push_handler_func(response)
203+
204+
def set_pubsub_push_handler(self, pubsub_push_handler_func):
205+
"""Set the pubsub push handler function"""
206+
self.pubsub_push_handler_func = pubsub_push_handler_func
207+
208+
def set_invalidation_push_handler(self, invalidation_push_handler_func):
209+
"""Set the invalidation push handler function"""
210+
self.invalidation_push_handler_func = invalidation_push_handler_func
211+
212+
161213
class _AsyncRESPBase(AsyncBaseParser):
162214
"""Base class for async resp parsing"""
163215

redis/_parsers/hiredis.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import socket
33
import sys
4+
from logging import getLogger
45
from typing import Callable, List, Optional, TypedDict, Union
56

67
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
@@ -11,7 +12,12 @@
1112
from ..exceptions import ConnectionError, InvalidResponse, RedisError
1213
from ..typing import EncodableT
1314
from ..utils import HIREDIS_AVAILABLE
14-
from .base import AsyncBaseParser, BaseParser
15+
from .base import (
16+
AsyncBaseParser,
17+
AsyncPushNotificationsParser,
18+
BaseParser,
19+
PushNotificationsParser,
20+
)
1521
from .socket import (
1622
NONBLOCKING_EXCEPTION_ERROR_NUMBERS,
1723
NONBLOCKING_EXCEPTIONS,
@@ -32,21 +38,29 @@ class _HiredisReaderArgs(TypedDict, total=False):
3238
errors: Optional[str]
3339

3440

35-
class _HiredisParser(BaseParser):
41+
class _HiredisParser(BaseParser, PushNotificationsParser):
3642
"Parser class for connections using Hiredis"
3743

3844
def __init__(self, socket_read_size):
3945
if not HIREDIS_AVAILABLE:
4046
raise RedisError("Hiredis is not installed")
4147
self.socket_read_size = socket_read_size
4248
self._buffer = bytearray(socket_read_size)
49+
self.pubsub_push_handler_func = self.handle_pubsub_push_response
50+
self.invalidation_push_handler_func = None
51+
self._hiredis_PushNotificationType = None
4352

4453
def __del__(self):
4554
try:
4655
self.on_disconnect()
4756
except Exception:
4857
pass
4958

59+
def handle_pubsub_push_response(self, response):
60+
logger = getLogger("push_response")
61+
logger.debug("Push response: " + str(response))
62+
return response
63+
5064
def on_connect(self, connection, **kwargs):
5165
import hiredis
5266

@@ -64,6 +78,12 @@ def on_connect(self, connection, **kwargs):
6478
self._reader = hiredis.Reader(**kwargs)
6579
self._next_response = NOT_ENOUGH_DATA
6680

81+
try:
82+
self._hiredis_PushNotificationType = hiredis.PushNotification
83+
except AttributeError:
84+
# hiredis < 3.2
85+
self._hiredis_PushNotificationType = None
86+
6787
def on_disconnect(self):
6888
self._sock = None
6989
self._reader = None
@@ -109,14 +129,24 @@ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
109129
if custom_timeout:
110130
sock.settimeout(self._socket_timeout)
111131

112-
def read_response(self, disable_decoding=False):
132+
def read_response(self, disable_decoding=False, push_request=False):
113133
if not self._reader:
114134
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
115135

116136
# _next_response might be cached from a can_read() call
117137
if self._next_response is not NOT_ENOUGH_DATA:
118138
response = self._next_response
119139
self._next_response = NOT_ENOUGH_DATA
140+
if self._hiredis_PushNotificationType is not None and isinstance(
141+
response, self._hiredis_PushNotificationType
142+
):
143+
response = self.handle_push_response(response)
144+
if not push_request:
145+
return self.read_response(
146+
disable_decoding=disable_decoding, push_request=push_request
147+
)
148+
else:
149+
return response
120150
return response
121151

122152
if disable_decoding:
@@ -135,6 +165,16 @@ def read_response(self, disable_decoding=False):
135165
# happened
136166
if isinstance(response, ConnectionError):
137167
raise response
168+
elif self._hiredis_PushNotificationType is not None and isinstance(
169+
response, self._hiredis_PushNotificationType
170+
):
171+
response = self.handle_push_response(response)
172+
if not push_request:
173+
return self.read_response(
174+
disable_decoding=disable_decoding, push_request=push_request
175+
)
176+
else:
177+
return response
138178
elif (
139179
isinstance(response, list)
140180
and response
@@ -144,7 +184,7 @@ def read_response(self, disable_decoding=False):
144184
return response
145185

146186

147-
class _AsyncHiredisParser(AsyncBaseParser):
187+
class _AsyncHiredisParser(AsyncBaseParser, AsyncPushNotificationsParser):
148188
"""Async implementation of parser class for connections using Hiredis"""
149189

150190
__slots__ = ("_reader",)
@@ -154,6 +194,14 @@ def __init__(self, socket_read_size: int):
154194
raise RedisError("Hiredis is not available.")
155195
super().__init__(socket_read_size=socket_read_size)
156196
self._reader = None
197+
self.pubsub_push_handler_func = self.handle_pubsub_push_response
198+
self.invalidation_push_handler_func = None
199+
self._hiredis_PushNotificationType = None
200+
201+
async def handle_pubsub_push_response(self, response):
202+
logger = getLogger("push_response")
203+
logger.debug("Push response: " + str(response))
204+
return response
157205

158206
def on_connect(self, connection):
159207
import hiredis
@@ -171,6 +219,14 @@ def on_connect(self, connection):
171219
self._reader = hiredis.Reader(**kwargs)
172220
self._connected = True
173221

222+
try:
223+
self._hiredis_PushNotificationType = getattr(
224+
hiredis, "PushNotification", None
225+
)
226+
except AttributeError:
227+
# hiredis < 3.2
228+
self._hiredis_PushNotificationType = None
229+
174230
def on_disconnect(self):
175231
self._connected = False
176232

@@ -195,7 +251,7 @@ async def read_from_socket(self):
195251
return True
196252

197253
async def read_response(
198-
self, disable_decoding: bool = False
254+
self, disable_decoding: bool = False, push_request: bool = False
199255
) -> Union[EncodableT, List[EncodableT]]:
200256
# If `on_disconnect()` has been called, prohibit any more reads
201257
# even if they could happen because data might be present.
@@ -207,6 +263,7 @@ async def read_response(
207263
response = self._reader.gets(False)
208264
else:
209265
response = self._reader.gets()
266+
210267
while response is NOT_ENOUGH_DATA:
211268
await self.read_from_socket()
212269
if disable_decoding:
@@ -219,6 +276,16 @@ async def read_response(
219276
# happened
220277
if isinstance(response, ConnectionError):
221278
raise response
279+
elif self._hiredis_PushNotificationType is not None and isinstance(
280+
response, self._hiredis_PushNotificationType
281+
):
282+
response = await self.handle_push_response(response)
283+
if not push_request:
284+
return await self.read_response(
285+
disable_decoding=disable_decoding, push_request=push_request
286+
)
287+
else:
288+
return response
222289
elif (
223290
isinstance(response, list)
224291
and response

redis/_parsers/resp3.py

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33

44
from ..exceptions import ConnectionError, InvalidResponse, ResponseError
55
from ..typing import EncodableT
6-
from .base import _AsyncRESPBase, _RESPBase
6+
from .base import (
7+
AsyncPushNotificationsParser,
8+
PushNotificationsParser,
9+
_AsyncRESPBase,
10+
_RESPBase,
11+
)
712
from .socket import SERVER_CLOSED_CONNECTION_ERROR
813

9-
_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"]
1014

11-
12-
class _RESP3Parser(_RESPBase):
15+
class _RESP3Parser(_RESPBase, PushNotificationsParser):
1316
"""RESP3 protocol implementation"""
1417

1518
def __init__(self, socket_read_size):
@@ -113,9 +116,7 @@ def _read_response(self, disable_decoding=False, push_request=False):
113116
)
114117
for _ in range(int(response))
115118
]
116-
response = self.handle_push_response(
117-
response, disable_decoding, push_request
118-
)
119+
response = self.handle_push_response(response)
119120
if not push_request:
120121
return self._read_response(
121122
disable_decoding=disable_decoding, push_request=push_request
@@ -129,20 +130,8 @@ def _read_response(self, disable_decoding=False, push_request=False):
129130
response = self.encoder.decode(response)
130131
return response
131132

132-
def handle_push_response(self, response, disable_decoding, push_request):
133-
if response[0] not in _INVALIDATION_MESSAGE:
134-
return self.pubsub_push_handler_func(response)
135-
if self.invalidation_push_handler_func:
136-
return self.invalidation_push_handler_func(response)
137-
138-
def set_pubsub_push_handler(self, pubsub_push_handler_func):
139-
self.pubsub_push_handler_func = pubsub_push_handler_func
140-
141-
def set_invalidation_push_handler(self, invalidation_push_handler_func):
142-
self.invalidation_push_handler_func = invalidation_push_handler_func
143-
144133

145-
class _AsyncRESP3Parser(_AsyncRESPBase):
134+
class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser):
146135
def __init__(self, socket_read_size):
147136
super().__init__(socket_read_size)
148137
self.pubsub_push_handler_func = self.handle_pubsub_push_response
@@ -253,9 +242,7 @@ async def _read_response(
253242
)
254243
for _ in range(int(response))
255244
]
256-
response = await self.handle_push_response(
257-
response, disable_decoding, push_request
258-
)
245+
response = await self.handle_push_response(response)
259246
if not push_request:
260247
return await self._read_response(
261248
disable_decoding=disable_decoding, push_request=push_request
@@ -268,15 +255,3 @@ async def _read_response(
268255
if isinstance(response, bytes) and disable_decoding is False:
269256
response = self.encoder.decode(response)
270257
return response
271-
272-
async def handle_push_response(self, response, disable_decoding, push_request):
273-
if response[0] not in _INVALIDATION_MESSAGE:
274-
return await self.pubsub_push_handler_func(response)
275-
if self.invalidation_push_handler_func:
276-
return await self.invalidation_push_handler_func(response)
277-
278-
def set_pubsub_push_handler(self, pubsub_push_handler_func):
279-
self.pubsub_push_handler_func = pubsub_push_handler_func
280-
281-
def set_invalidation_push_handler(self, invalidation_push_handler_func):
282-
self.invalidation_push_handler_func = invalidation_push_handler_func

redis/asyncio/client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
)
7171
from redis.typing import ChannelT, EncodableT, KeyT
7272
from redis.utils import (
73-
HIREDIS_AVAILABLE,
7473
SSL_AVAILABLE,
7574
_set_info_logger,
7675
deprecated_args,
@@ -938,7 +937,7 @@ async def connect(self):
938937
self.connection.register_connect_callback(self.on_connect)
939938
else:
940939
await self.connection.connect()
941-
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
940+
if self.push_handler_func is not None:
942941
self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
943942

944943
self._event_dispatcher.dispatch(

0 commit comments

Comments
 (0)