Skip to content

Commit 834086f

Browse files
authored
Celery broker_url transport options (#16787)
* handle url params that celery/kombu accepts only as broker transport options * add note/comment
1 parent ed56263 commit 834086f

File tree

3 files changed

+48
-0
lines changed

3 files changed

+48
-0
lines changed

bin/redis-tls

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,10 @@ unset REDIS_PORT
1616

1717
# Ensure all clients have a 5-second timeout.
1818
# Configure the TLS settings for our Redis connection
19+
# When adding parameters such as socket_timeout,
20+
# you may need to update the broker url parsing in
21+
# warehouse/tasks.py to apply them to celery_transport_options
22+
# See https://docs.celeryq.dev/projects/kombu/en/stable/_modules/kombu/connection.html#Connection
23+
# and https://docs.celeryq.dev/projects/kombu/en/stable/reference/kombu.transport.redis.html#transport-options
24+
# for available/allowed arguments/mappings
1925
REDIS_URL="$REDIS_URL?socket_timeout=5&ssl_cert_reqs=required&ssl_ca_certs=$(python -m certifi)"

tests/unit/test_tasks.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,23 @@ def test_make_celery_app():
593593
"redis://127.0.0.1:6379/10",
594594
{},
595595
),
596+
(
597+
Environment.production,
598+
True,
599+
None,
600+
(
601+
"rediss://user:pass@redis.example.com:6379/10"
602+
"?socket_timeout=5&irreleveant=0"
603+
"&ssl_cert_reqs=required&ssl_ca_certs=/p/a/t/h/cacert.pem"
604+
),
605+
(
606+
"rediss://user:pass@redis.example.com:6379/10"
607+
"?ssl_cert_reqs=required&ssl_ca_certs=/p/a/t/h/cacert.pem"
608+
),
609+
{
610+
"socket_timeout": 5,
611+
},
612+
),
596613
],
597614
)
598615
def test_includeme(

warehouse/tasks.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,31 @@ def includeme(config):
306306
"tcp_keepalive": True,
307307
}
308308

309+
if broker_url.startswith("redis"):
310+
parsed_url = urllib.parse.urlparse( # noqa: WH001, going to urlunparse this
311+
broker_url
312+
)
313+
parsed_query = urllib.parse.parse_qs(parsed_url.query)
314+
315+
celery_transport_options = {
316+
"socket_timeout": int,
317+
}
318+
319+
for key, value in parsed_query.copy().items():
320+
if key.startswith("ssl_"):
321+
continue
322+
else:
323+
if key in celery_transport_options:
324+
broker_transport_options[key] = celery_transport_options[key](
325+
value[0]
326+
)
327+
del parsed_query[key]
328+
329+
parsed_url = parsed_url._replace(
330+
query=urllib.parse.urlencode(parsed_query, doseq=True, safe="/")
331+
)
332+
broker_url = urllib.parse.urlunparse(parsed_url)
333+
309334
config.registry["celery.app"] = celery.Celery(
310335
"warehouse", autofinalize=False, set_as_current=False
311336
)

0 commit comments

Comments
 (0)