@@ -156,18 +156,10 @@ impl AsyncConnection for AsyncPgConnection {
156
156
T : AsQuery + ' query ,
157
157
T :: Query : QueryFragment < Self :: Backend > + QueryId + ' query ,
158
158
{
159
- let connection_future = self . connection_future . as_ref ( ) . map ( |rx| rx. resubscribe ( ) ) ;
160
159
let query = source. as_query ( ) ;
161
- let load_future = self . with_prepared_statement ( query, |conn, stmt, binds| async move {
162
- let res = conn. query_raw ( & stmt, binds) . await . map_err ( ErrorHelper ) ?;
163
-
164
- Ok ( res
165
- . map_err ( |e| diesel:: result:: Error :: from ( ErrorHelper ( e) ) )
166
- . map_ok ( PgRow :: new)
167
- . boxed ( ) )
168
- } ) ;
160
+ let load_future = self . with_prepared_statement ( query, load_prepared) ;
169
161
170
- drive_future ( connection_future , load_future ) . boxed ( )
162
+ self . run_with_connection_future ( load_future )
171
163
}
172
164
173
165
fn execute_returning_count < ' conn , ' query , T > (
@@ -177,19 +169,8 @@ impl AsyncConnection for AsyncPgConnection {
177
169
where
178
170
T : QueryFragment < Self :: Backend > + QueryId + ' query ,
179
171
{
180
- let connection_future = self . connection_future . as_ref ( ) . map ( |rx| rx. resubscribe ( ) ) ;
181
- let execute = self . with_prepared_statement ( source, |conn, stmt, binds| async move {
182
- let binds = binds
183
- . iter ( )
184
- . map ( |b| b as & ( dyn ToSql + Sync ) )
185
- . collect :: < Vec < _ > > ( ) ;
186
-
187
- let res = tokio_postgres:: Client :: execute ( & conn, & stmt, & binds as & [ _ ] )
188
- . await
189
- . map_err ( ErrorHelper ) ?;
190
- Ok ( res as usize )
191
- } ) ;
192
- drive_future ( connection_future, execute) . boxed ( )
172
+ let execute = self . with_prepared_statement ( source, execute_prepared) ;
173
+ self . run_with_connection_future ( execute)
193
174
}
194
175
195
176
fn transaction_state ( & mut self ) -> & mut AnsiTransactionManager {
@@ -212,6 +193,35 @@ impl Drop for AsyncPgConnection {
212
193
}
213
194
}
214
195
196
+ async fn load_prepared (
197
+ conn : Arc < tokio_postgres:: Client > ,
198
+ stmt : Statement ,
199
+ binds : Vec < ToSqlHelper > ,
200
+ ) -> QueryResult < BoxStream < ' static , QueryResult < PgRow > > > {
201
+ let res = conn. query_raw ( & stmt, binds) . await . map_err ( ErrorHelper ) ?;
202
+
203
+ Ok ( res
204
+ . map_err ( |e| diesel:: result:: Error :: from ( ErrorHelper ( e) ) )
205
+ . map_ok ( PgRow :: new)
206
+ . boxed ( ) )
207
+ }
208
+
209
+ async fn execute_prepared (
210
+ conn : Arc < tokio_postgres:: Client > ,
211
+ stmt : Statement ,
212
+ binds : Vec < ToSqlHelper > ,
213
+ ) -> QueryResult < usize > {
214
+ let binds = binds
215
+ . iter ( )
216
+ . map ( |b| b as & ( dyn ToSql + Sync ) )
217
+ . collect :: < Vec < _ > > ( ) ;
218
+
219
+ let res = tokio_postgres:: Client :: execute ( & conn, & stmt, & binds as & [ _ ] )
220
+ . await
221
+ . map_err ( ErrorHelper ) ?;
222
+ Ok ( res as usize )
223
+ }
224
+
215
225
#[ inline( always) ]
216
226
fn update_transaction_manager_status < T > (
217
227
query_result : QueryResult < T > ,
@@ -335,6 +345,14 @@ impl AsyncPgConnection {
335
345
Ok ( ( ) )
336
346
}
337
347
348
+ fn run_with_connection_future < ' query , R > (
349
+ & self ,
350
+ future : impl Future < Output = QueryResult < R > > + Send + ' query ,
351
+ ) -> BoxFuture < ' query , QueryResult < R > > {
352
+ let connection_future = self . connection_future . as_ref ( ) . map ( |rx| rx. resubscribe ( ) ) ;
353
+ drive_future ( connection_future, future) . boxed ( )
354
+ }
355
+
338
356
fn with_prepared_statement < ' a , T , F , R > (
339
357
& mut self ,
340
358
query : T ,
@@ -354,9 +372,7 @@ impl AsyncPgConnection {
354
372
// so there is no need to even access the query in the async block below
355
373
let is_safe_to_cache_prepared = query. is_safe_to_cache_prepared ( & diesel:: pg:: Pg ) ;
356
374
let mut query_builder = PgQueryBuilder :: default ( ) ;
357
- let sql = query
358
- . to_sql ( & mut query_builder, & Pg )
359
- . map ( |_| query_builder. finish ( ) ) ;
375
+ let to_sql_sql = query. to_sql ( & mut query_builder, & Pg ) ;
360
376
361
377
let mut bind_collector = RawBytesBindCollector :: < diesel:: pg:: Pg > :: new ( ) ;
362
378
let query_id = T :: query_id ( ) ;
@@ -371,13 +387,39 @@ impl AsyncPgConnection {
371
387
let collect_bind_result =
372
388
query. collect_binds ( & mut bind_collector, & mut metadata_lookup, & Pg ) ;
373
389
390
+ // The code that doesn't need the `T` generic parameter is in a separate function to reduce LLVM IR lines
391
+ self . with_prepared_statement_after_sql_built (
392
+ is_safe_to_cache_prepared,
393
+ query_builder,
394
+ to_sql_result,
395
+ bind_collector,
396
+ query_id,
397
+ metadata_lookup,
398
+ collect_bind_result,
399
+ )
400
+ }
401
+
402
+ fn with_prepared_statement_after_sql_built < ' a , F , R > (
403
+ & mut self ,
404
+ is_safe_to_cache_prepared : QueryResult < bool > ,
405
+ query_builder : PgQueryBuilder ,
406
+ to_sql_result : QueryResult < ( ) > ,
407
+ bind_collector : RawBytesBindCollector < Pg > ,
408
+ query_id : Option < std:: any:: TypeId > ,
409
+ metadata_lookup : PgAsyncMetadataLookup ,
410
+ collect_bind_result : QueryResult < ( ) > ,
411
+ ) -> BoxFuture < ' a , QueryResult < R > >
412
+ where
413
+ F : Future < Output = QueryResult < R > > + Send ,
414
+ R : Send ,
415
+ {
374
416
let raw_connection = self . conn . clone ( ) ;
375
417
let stmt_cache = self . stmt_cache . clone ( ) ;
376
418
let metadata_cache = self . metadata_cache . clone ( ) ;
377
419
let tm = self . transaction_state . clone ( ) ;
378
420
379
421
async move {
380
- let sql = sql ?;
422
+ let sql = to_sql_result . map ( |_| query_builder . finish ( ) ) ?;
381
423
let is_safe_to_cache_prepared = is_safe_to_cache_prepared?;
382
424
collect_bind_result?;
383
425
// Check whether we need to resolve some types at all
0 commit comments