Skip to content

Commit

Permalink
Merge pull request #482 from MrRoudyk/feat/481-optimize-loan-states-l…
Browse files Browse the repository at this point in the history
…oading

feat: implement batch data fetching and error handling in DataConnector
  • Loading branch information
djeck1432 authored Feb 28, 2025
2 parents 0a1a245 + 8f8cf64 commit 1ca7763
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 23 deletions.
131 changes: 124 additions & 7 deletions apps/dashboard_app/data_conector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
This is module connects to a PostgreSQL database and fetches data.
"""

import logging
import os
from typing import List, Optional

import pandas as pd
import sqlalchemy
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger(__name__)


class DataConnector:
"""
Expand Down Expand Up @@ -46,7 +50,7 @@ class DataConnector:

def __init__(self):
"""
Initialize the DataHandler with database connection details.
Initialize the DataConnector with database connection details.
"""
self._check_env_variables()
self.db_url = (
Expand All @@ -56,16 +60,129 @@ def __init__(self):
)
self.engine = sqlalchemy.create_engine(self.db_url)

def fetch_data(self, query: str) -> pd.DataFrame:
def fetch_data(
self,
query: str,
protocol: Optional[str] = None,
batch_size: int = 1000,
start_block: Optional[int] = None,
end_block: Optional[int] = None,
) -> pd.DataFrame:
"""
Fetch data from the database using a SQL query.
Fetch data from the database using a SQL query with block-based pagination.
:param query: SQL query to execute.
:param batch_size: Number of blocks to process in each batch
:param start_block: Starting block number for filtering (optional)
:param end_block: Ending block number for filtering (optional)
:param protocol: Protocol identifier for determining block range (optional)
:return: DataFrame containing the query results
"""
# If protocol is provided and block range is not, determine block range automatically
if protocol:
if start_block is None:
start_block = self.fetch_protocol_first_block_number(protocol)
if end_block is None:
end_block = self.fetch_protocol_last_block_number(protocol)

# Ensure we have valid block range
if start_block is None or end_block is None:
logger.warning(
"No block range provided and no protocol specified to determine range"
)
# Execute the query without any block-based pagination
try:
with self.engine.connect() as connection:
return pd.read_sql(query, connection)
except sqlalchemy.exc.SQLAlchemyError as e:
logger.error(f"Database error: {e}")
raise DatabaseConnectionError(f"Failed to execute query: {str(e)}")

all_data = []
clean_query = query.strip()
if clean_query.endswith(";"):
clean_query = clean_query[:-1]

# Check if query already contains block filters
if "block >= " in clean_query or "block <= " in clean_query:
logger.warning("Query already contains block filters, using as-is")
with self.engine.connect() as connection:
return pd.read_sql(query, connection)

for current_start in range(start_block, end_block + 1, batch_size):
current_end = min(current_start + batch_size - 1, end_block)

# Check if query contains WHERE clause to determine how to add block filtering
if "WHERE" in clean_query.upper():
block_query = f"{clean_query} AND block >= {current_start} AND block <= {current_end};"
else:
block_query = sqlalchemy.text(
f"{clean_query} WHERE block >= :start_block AND block <= :end_block"
)

try:
with self.engine.connect() as connection:
batch = pd.read_sql(
block_query,
connection,
params={"start_block": current_start, "end_block": current_end},
)

if not batch.empty:
all_data.append(batch)
else:
logger.info(
f"No records found in block range {current_start}-{current_end}"
)

except sqlalchemy.exc.SQLAlchemyError as e:
logger.error(f"Database error: {e}")
raise DatabaseConnectionError(f"Failed to execute query: {str(e)}")

if not all_data:
return pd.DataFrame()

return pd.concat(all_data, ignore_index=True)

def fetch_protocol_first_block_number(self, protocol: str) -> int:
"""
Fetch the first block number for a specific protocol.
:param protocol: Protocol identifier (e.g., 'zkLend').
:return: First block number
"""
query = """
SELECT MIN(block) as first_block
FROM loan_state
WHERE protocol_id = :protocol;
"""
with self.engine.connect() as connection:
result = pd.read_sql(
sqlalchemy.text(query), connection, params={"protocol": protocol}
)
if not result.empty and not pd.isna(result["first_block"].iloc[0]):
return int(result["first_block"].iloc[0])
return 0

def fetch_protocol_last_block_number(self, protocol: str) -> int:
"""
Fetch the last block number for a specific protocol.
:param protocol: Protocol identifier (e.g., 'zkLend').
:return: Last block number as an integer.
"""
query = """
SELECT MAX(block) as last_block
FROM loan_state
WHERE protocol_id = :protocol;
"""
with self.engine.connect() as connection:
df = pd.read_sql(query, connection)
return df
result = pd.read_sql(
sqlalchemy.text(query), connection, params={"protocol": protocol}
)
if not result.empty and not pd.isna(result["last_block"].iloc[0]):
return int(result["last_block"].iloc[0])
return 0

def _check_env_variables(self) -> None:
"""
Expand All @@ -88,11 +205,11 @@ def fetch_data_from_csv(self, file_path: str) -> pd.DataFrame:
df = pd.read_csv(file_path)
return df
except Exception as e:
print(f"Error reading CSV file: {e}")
logger.error(f"Error reading CSV file: {e}")
return None


if __name__ == "__main__":
connector = DataConnector()
df = connector.fetch_data("loan_state", "zkLend")
df = connector.fetch_data(DataConnector.ZKLEND_SQL_QUERY, "zkLend")
print(df)
65 changes: 51 additions & 14 deletions apps/dashboard_app/helpers/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,68 @@ def _init_zklend_state(self) -> ZkLendState:
"""
logger.info("Initializing ZkLend state.")
zklend_state = ZkLendState()
self._fetch_and_process_zklend_data(zklend_state)
self._set_zklend_interest_rates(zklend_state)
return zklend_state

def _fetch_and_process_zklend_data(self, zklend_state):
PROTOCOL_ZKLEND = "zkLend"
BATCH_SIZE = 1000
start = monotonic()
zklend_data = self.data_connector.fetch_data(
self.data_connector.ZKLEND_SQL_QUERY
)

try:
first_block = self.data_connector.fetch_protocol_first_block_number(
PROTOCOL_ZKLEND
)
last_block = self.data_connector.fetch_protocol_last_block_number(
PROTOCOL_ZKLEND
)

current_block = first_block
while current_block <= last_block:
end_block = min(current_block + BATCH_SIZE - 1, last_block)
batch = self.data_connector.fetch_data(
self.data_connector.ZKLEND_SQL_QUERY,
protocol=PROTOCOL_ZKLEND,
batch_size=BATCH_SIZE,
start_block=current_block,
end_block=end_block,
)

if not batch.empty:
zklend_data_dict = batch.to_dict(orient="records")
for loan_state in zklend_data_dict:
user_loan_state = zklend_state.loan_entities[loan_state["user"]]
user_loan_state.collateral_enabled.values = loan_state[
"collateral_enabled"
]
user_loan_state.collateral.values = loan_state["collateral"]
user_loan_state.debt.values = loan_state["debt"]
logger.info(
f"Processed {len(batch)} records for blocks {current_block} to {end_block}"
)
current_block = end_block + 1

logger.info(f"Processed total of {last_block - first_block + 1} blocks")

except Exception as e:
logger.error(f"Error processing ZkLend data: {e}")
raise

zklend_state.last_block_number = last_block
logger.info("Initialized ZkLend state in %.2fs", monotonic() - start)

def _set_zklend_interest_rates(self, zklend_state):
zklend_interest_rate_data = self.data_connector.fetch_data(
self.data_connector.ZKLEND_INTEREST_RATE_SQL_QUERY
)

zklend_data_dict = zklend_data.to_dict(orient="records")
for loan_state in zklend_data_dict:
user_loan_state = zklend_state.loan_entities[loan_state["user"]]
user_loan_state.collateral_enabled.values = loan_state["collateral_enabled"]
user_loan_state.collateral.values = loan_state["collateral"]
user_loan_state.debt.values = loan_state["debt"]

zklend_state.last_block_number = zklend_data["block"].max()
zklend_state.interest_rate_models.collateral = zklend_interest_rate_data[
"collateral"
].iloc[0]
zklend_state.interest_rate_models.debt = zklend_interest_rate_data["debt"].iloc[
0
]
logger.info("Initialized ZkLend state in %.2fs", monotonic() - start)

return zklend_state

def _set_prices(self) -> None:
"""
Expand Down
12 changes: 10 additions & 2 deletions apps/dashboard_app/tests/test_dashboard_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ def mock_data_connector():
with patch("dashboard_app.helpers.load_data.DataConnector") as MockConnector:
connector = MockConnector

connector.fetch_protocol_first_block_number.return_value = 1
connector.fetch_protocol_last_block_number.return_value = 6

# Mocking fetch_data calls with dummy data
def fetch_data_side_effect(query):
def fetch_data_side_effect(query, **kwargs):
if query == connector.ZKLEND_SQL_QUERY:
return ZKLEND_DATA
elif query == connector.ZKLEND_INTEREST_RATE_SQL_QUERY:
Expand All @@ -43,6 +46,7 @@ def fetch_data_side_effect(query):
raise ValueError(f"Unexpected query: {query}")

connector.fetch_data.side_effect = fetch_data_side_effect

yield connector


Expand Down Expand Up @@ -81,7 +85,11 @@ def test_init_dashboard_data_handler(handler):
"""Test to ensure all attributes were set during DashboardDataHandler init."""
assert handler.zklend_state is not None
assert handler.zklend_state.get_protocol_name == "zkLend"
assert handler.zklend_state.last_block_number == ZKLEND_DATA["block"].max()
assert handler.zklend_state.last_block_number is not None
assert (
handler.zklend_state.last_block_number
== handler.data_connector.fetch_protocol_last_block_number("zkLend")
)
assert (
handler.zklend_state.interest_rate_models.collateral
== ZKLEND_INTEREST_RATE["collateral"].iloc[0]
Expand Down

0 comments on commit 1ca7763

Please sign in to comment.