3
3
from schema import IssuesParams , ProvisionParams , SpecificationsParams
4
4
from pagination_model import PaginationParams , PaginatedResult
5
5
from config import config
6
-
6
+ import json
7
7
8
8
logger = get_logger (__name__ )
9
9
@@ -56,19 +56,22 @@ def search_issues(params: IssuesParams):
56
56
57
57
def search_provision_summary (params : ProvisionParams ):
58
58
s3_uri = f"s3://{ config .collection_bucket } /{ config .performance_base_path } /*.parquet" # noqa
59
- pagination = f"LIMIT { params .limit } OFFSET { params .offset } "
60
59
61
60
where_clause = ""
61
+ query_params = []
62
+
62
63
if params .dataset :
63
- where_clause += _add_condition (where_clause , f"dataset = '{ params .dataset } '" )
64
+ where_clause += _add_condition (where_clause , "dataset = ?" )
65
+ query_params .append (params .dataset )
66
+
64
67
if params .organisation :
65
- where_clause += _add_condition (
66
- where_clause , f"organisation = '{ params .organisation } '"
67
- )
68
+ where_clause += _add_condition (where_clause , "organisation = ?" )
69
+ query_params .append (params .organisation )
68
70
69
71
sql_count = f"SELECT COUNT(*) FROM '{ s3_uri } ' { where_clause } "
72
+ sql_results = f"SELECT * FROM '{ s3_uri } ' { where_clause } LIMIT ? OFFSET ?"
73
+
70
74
logger .debug (sql_count )
71
- sql_results = f"SELECT * FROM '{ s3_uri } ' { where_clause } { pagination } "
72
75
logger .debug (sql_results )
73
76
74
77
with duckdb .connect () as conn :
@@ -80,33 +83,53 @@ def search_provision_summary(params: ProvisionParams):
80
83
).fetchall ()
81
84
)
82
85
logger .debug (conn .execute ("FROM duckdb_secrets();" ).fetchall ())
83
- count = conn .execute (sql_count ).fetchone ()[
84
- 0
85
- ] # Count is first item in Tuple
86
- results = conn .execute (sql_results ).arrow ().to_pylist ()
86
+
87
+ # Execute parameterized queries
88
+ count = conn .execute (sql_count , query_params ).fetchone ()[0 ]
89
+ results = (
90
+ conn .execute (sql_results , query_params + [params .limit , params .offset ])
91
+ .arrow ()
92
+ .to_pylist ()
93
+ )
94
+
87
95
return PaginatedResult (
88
96
params = PaginationParams (offset = params .offset , limit = params .limit ),
89
97
total_results_available = count ,
90
98
data = results ,
91
99
)
92
100
except Exception as e :
93
- logger .exception (
94
- "Failure executing DuckDB queries" ,
95
- )
101
+ logger .exception ("Failure executing DuckDB queries" )
96
102
raise e
97
103
98
104
99
105
def get_specification (params : SpecificationsParams ):
100
106
s3_uri = f"s3://{ config .collection_bucket } /{ config .specification_base_path } /*.parquet" # noqa
101
- pagination = f"LIMIT { params .limit } OFFSET { params .offset } "
102
107
103
108
where_clause = ""
109
+ query_params = {}
110
+
104
111
if params .dataset :
105
- where_clause += _add_condition (where_clause , f"dataset = '{ params .dataset } '" )
112
+ where_clause += _add_condition (
113
+ where_clause ,
114
+ "TRIM(BOTH '\" ' FROM json_extract(json(value), '$.dataset')) = ?" ,
115
+ )
116
+ query_params ["dataset" ] = params .dataset
117
+
118
+ sql_count = f"""
119
+ SELECT COUNT(*) FROM (
120
+ SELECT unnest(CAST(json AS VARCHAR[])) AS value
121
+ FROM '{ s3_uri } ') AS parsed_json { where_clause }
122
+ LIMIT ? OFFSET ?
123
+ """
124
+
125
+ sql_results = f"""
126
+ SELECT value AS json FROM (
127
+ SELECT unnest(CAST(json AS VARCHAR[])) AS value
128
+ FROM '{ s3_uri } ') AS parsed_json { where_clause }
129
+ LIMIT ? OFFSET ?
130
+ """
106
131
107
- sql_count = f"SELECT COUNT(*) FROM '{ s3_uri } ' { where_clause } "
108
132
logger .debug (sql_count )
109
- sql_results = f"SELECT * FROM '{ s3_uri } ' { where_clause } { pagination } "
110
133
logger .debug (sql_results )
111
134
112
135
with duckdb .connect () as conn :
@@ -118,19 +141,36 @@ def get_specification(params: SpecificationsParams):
118
141
).fetchall ()
119
142
)
120
143
logger .debug (conn .execute ("FROM duckdb_secrets();" ).fetchall ())
121
- count = conn .execute (sql_count ).fetchone ()[
122
- 0
123
- ] # Count is first item in Tuple
124
- results = conn .execute (sql_results ).arrow ().to_pylist ()
144
+
145
+ # Execute queries with parameters
146
+ count = conn .execute (
147
+ sql_count , [* query_params .values (), params .limit , params .offset ]
148
+ ).fetchone ()[0 ]
149
+ results = (
150
+ conn .execute (
151
+ sql_results , [* query_params .values (), params .limit , params .offset ]
152
+ )
153
+ .arrow ()
154
+ .to_pylist ()
155
+ )
156
+
157
+ # Convert JSON strings to actual JSON objects
158
+ json_results = []
159
+ for item in results :
160
+ if "json" in item and isinstance (item ["json" ], str ):
161
+ try :
162
+ parsed_json = json .loads (item ["json" ])
163
+ json_results .append (parsed_json )
164
+ except json .JSONDecodeError :
165
+ logger .warning (f"Invalid JSON format in row: { item ['json' ]} " )
166
+
125
167
return PaginatedResult (
126
168
params = PaginationParams (offset = params .offset , limit = params .limit ),
127
169
total_results_available = count ,
128
- data = results ,
170
+ data = json_results ,
129
171
)
130
172
except Exception as e :
131
- logger .exception (
132
- "Failure executing DuckDB queries" ,
133
- )
173
+ logger .exception ("Failure executing DuckDB queries" )
134
174
raise e
135
175
136
176
0 commit comments