Skip to content

Commit

Permalink
Improve async client retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
TheEdgeOfRage committed Jan 23, 2024
1 parent 2e20cae commit 78b2e34
Showing 1 changed file with 48 additions and 12 deletions.
60 changes: 48 additions & 12 deletions dune_client/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import asyncio
from io import BytesIO
from time import time
from typing import Any, Callable, Optional, Union

from aiohttp import (
ClientResponseError,
ClientSession,
ClientResponse,
ContentTypeError,
Expand All @@ -32,6 +32,34 @@
from dune_client.query import QueryBase, parse_query_object_or_id


class RetryableError(Exception):
"""
Internal exception used to signal that the request should be retried
"""

def __init__(
self,
base_error: ClientResponseError
) -> None:
self.base_error = base_error


class MaxRetryError(Exception):
"""
This exception is raised when the maximum number of retries is exceeded,
e.g. due to rate limiting or internal server errors
"""

def __init__(
self, url: str, reason: Exception | None = None
) -> None:
self.reason = reason

message = f"Max retries exceeded with url: {url} (Caused by {reason!r})"

super().__init__(message)


# pylint: disable=duplicate-code
class AsyncDuneClient(BaseDuneClient):
"""
Expand Down Expand Up @@ -78,9 +106,13 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.disconnect()

async def _handle_response(self, response: ClientResponse) -> Any:
if response.status == 429:
raise RateLimitedError

if response.status in {429, 500, 502, 503, 504}:
try:
response.raise_for_status()
except ClientResponseError as err:
raise RetryableError(
base_error=err,
) from err
try:
# Some responses can be decoded and converted to DuneErrors
response_json = await response.json()
Expand All @@ -94,17 +126,21 @@ async def _handle_response(self, response: ClientResponse) -> Any:
def _route_url(self, route: str) -> str:
return f"{self.api_version}{route}"

async def _handle_ratelimit(self, call: Callable[..., Any]) -> Any:
async def _handle_ratelimit(self, call: Callable[..., Any], url: str) -> Any:
"""Generic wrapper around request callables. If the request fails due to rate limiting,
it will retry it up to five times, sleeping i * 5s in between"""
or server side errors, it will retry it up to five times, sleeping i * 5s in between"""
backoff_factor = 0.5
error: Optional[ClientResponseError] = None
for i in range(5):
try:
return await call()
except RateLimitedError:
self.logger.warning(f"Rate limited. Retrying in {i * 5} seconds.")
time.sleep(i * 5)
except RetryableError as e:
self.logger.warning(
f"Rate limited or internal error. Retrying in {i * 5} seconds...")
error = e.base_error
await asyncio.sleep(i**2 * backoff_factor)

raise RateLimitedError
raise MaxRetryError(url, error)

async def _get(
self,
Expand All @@ -127,7 +163,7 @@ async def _get() -> Any:
return response
return await self._handle_response(response)

return await self._handle_ratelimit(_get)
return await self._handle_ratelimit(_get, route)

async def _post(self, route: str, params: Any) -> Any:
url = self._route_url(route)
Expand All @@ -143,7 +179,7 @@ async def _post() -> Any:
)
return await self._handle_response(response)

return await self._handle_ratelimit(_post)
return await self._handle_ratelimit(_post, route)

async def execute(
self, query: QueryBase, performance: Optional[str] = None
Expand Down

0 comments on commit 78b2e34

Please sign in to comment.