7
7
#include < ydb/core/kqp/opt/kqp_statistics_transformer.h>
8
8
#include < ydb/core/kqp/opt/kqp_column_statistics_requester.h>
9
9
#include < ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
10
+ #include < ydb/core/kqp/opt/kqp_new_rbo_transformer.h>
10
11
#include < ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
11
12
12
13
20
21
#include < yql/essentials/core/type_ann/type_ann_expr.h>
21
22
#include < yql/essentials/utils/log/log.h>
22
23
#include < yql/essentials/core/services/yql_transform_pipeline.h>
24
+ #include < yql/essentials/core/yql_graph_transformer.h>
23
25
#include < yql/essentials/core/yql_opt_proposed_by_data.h>
24
26
25
27
#include < ydb/library/yql/providers/dq/common/yql_dq_settings.h>
26
28
#include < ydb/library/yql/providers/dq/opt/dqs_opt.h>
27
29
30
+
28
31
#include < util/generic/is_in.h>
29
32
30
33
namespace NKikimr {
@@ -153,6 +156,8 @@ class TKqpRunner : public IKqpRunner {
153
156
YQL_ENSURE (TransformCtx->QueryCtx ->Type == EKikimrQueryType::Dml);
154
157
YQL_ENSURE (TMaybeNode<TKiDataQueryBlocks>(query));
155
158
159
+ YQL_CLOG (DEBUG, CoreDq) << " Before any rewrites: " << KqpExprToPrettyString (*query, ctx);
160
+
156
161
return PrepareQueryInternal (cluster, TKiDataQueryBlocks (query), ctx, settings);
157
162
}
158
163
@@ -192,6 +197,8 @@ class TKqpRunner : public IKqpRunner {
192
197
YQL_ENSURE (IsIn ({EKikimrQueryType::Query, EKikimrQueryType::Script}, TransformCtx->QueryCtx ->Type ));
193
198
YQL_ENSURE (TMaybeNode<TKiDataQueryBlocks>(query));
194
199
200
+ YQL_CLOG (DEBUG, CoreDq) << " Before any rewrites: " << KqpExprToPrettyString (*query, ctx);
201
+
195
202
const auto dataQueryBlocks = TKiDataQueryBlocks (query);
196
203
197
204
if (IsOlapQuery (dataQueryBlocks)) {
@@ -267,21 +274,35 @@ class TKqpRunner : public IKqpRunner {
267
274
268
275
TExprNode::TPtr query = kqlQueryBlocks->Ptr ();
269
276
YQL_CLOG (DEBUG, ProviderKqp) << " Initial KQL query: " << KqpExprToPrettyString (*query, ctx);
277
+ YQL_CLOG (DEBUG, CoreDq) << " Initial KQL query: " << KqpExprToPrettyString (*query, ctx);
278
+
270
279
271
280
TransformCtx->Reset ();
272
281
BuildQueryCtx->Reset ();
273
282
Transformer->Rewind ();
283
+ NewRBOTransformer->Rewind ();
274
284
275
285
TransformCtx->DataQueryBlocks = dataQueryBlocks;
276
286
277
- return MakeIntrusive<TPrepareQueryAsyncResult>(query, *Transformer, ctx, *TransformCtx);
287
+ if (Config->EnableNewRBO ) {
288
+ YQL_CLOG (INFO, CoreDq) << " Taking the new RBO branch" ;
289
+ return MakeIntrusive<TPrepareQueryAsyncResult>(query, *NewRBOTransformer, ctx, *TransformCtx);
290
+ // return MakeIntrusive<TPrepareQueryAsyncResult>(query, *Transformer, ctx, *TransformCtx);
291
+ }
292
+ else {
293
+ YQL_CLOG (INFO, CoreDq) << " Taking the old RBO branch" ;
294
+ return MakeIntrusive<TPrepareQueryAsyncResult>(query, *Transformer, ctx, *TransformCtx);
295
+ }
278
296
}
279
297
280
298
void CreateGraphTransformer (const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
281
299
const NMiniKQL::IFunctionRegistry& funcRegistry)
282
300
{
283
301
auto preparedExplainTransformer = CreateKqpExplainPreparedTransformer (
284
302
Gateway, Cluster, TransformCtx, &funcRegistry, *typesCtx, OptimizeCtx);
303
+
304
+ auto newRBOPreparedExplainTransformer = CreateKqpExplainPreparedTransformer (
305
+ Gateway, Cluster, TransformCtx, &funcRegistry, *typesCtx, OptimizeCtx);
285
306
286
307
auto physicalOptimizePipeline = TTransformationPipeline (typesCtx)
287
308
.AddServiceTransformers ()
@@ -313,6 +334,28 @@ class TKqpRunner : public IKqpRunner {
313
334
.Add (CreateKqpCheckPhysicalQueryTransformer (), " CheckKqlPhysicalQuery" )
314
335
.Build (false ));
315
336
337
+ auto newRBOPhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer (TTransformationPipeline (typesCtx)
338
+ .AddServiceTransformers ()
339
+ .Add (Log (" NewRBOPhysicalOptimize" ), " LogNewRBOPhysicalOptimize" )
340
+ .AddPreTypeAnnotation ()
341
+ // .AddExpressionEvaluation(funcRegistry)
342
+ .AddIOAnnotation ()
343
+ .AddTypeAnnotationTransformer (CreateKqpTypeAnnotationTransformer (Cluster, sessionCtx->TablesPtr (),
344
+ *typesCtx, Config))
345
+ .Add (CreateKqpCheckQueryTransformer (), " CheckKqlQuery" )
346
+ .AddPostTypeAnnotation (/* forSubgraph */ true )
347
+ // .AddCommonOptimization()
348
+
349
+ .Add (CreateKqpPgRewriteTransformer (OptimizeCtx, *typesCtx), " RewritePgSelect" )
350
+ .Add (CreateKqpNewRBOTransformer (OptimizeCtx, *typesCtx, Config), " NewRBOTransformer" )
351
+
352
+ // .Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize")
353
+ // .Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize")
354
+ // .Add(CreateKqpQueryPhasesTransformer(), "QueryPhases")
355
+ // .Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects")
356
+ // .Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
357
+ .Build (false ));
358
+
316
359
auto physicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer (TTransformationPipeline (typesCtx)
317
360
.AddServiceTransformers ()
318
361
.Add (Log (" PhysicalBuildTxs" ), " LogPhysicalBuildTxs" )
@@ -330,6 +373,23 @@ class TKqpRunner : public IKqpRunner {
330
373
" BuildPhysicalTxs" )
331
374
.Build (false ));
332
375
376
+ auto newRBOPhysicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer (TTransformationPipeline (typesCtx)
377
+ .AddServiceTransformers ()
378
+ .Add (Log (" PhysicalBuildTxs" ), " LogPhysicalBuildTxs" )
379
+ .AddTypeAnnotationTransformer (CreateKqpTypeAnnotationTransformer (Cluster, sessionCtx->TablesPtr (), *typesCtx, Config))
380
+ .AddPostTypeAnnotation (/* forSubgraph */ true )
381
+ .Add (
382
+ CreateKqpBuildTxsTransformer (
383
+ OptimizeCtx,
384
+ BuildQueryCtx,
385
+ CreateTypeAnnotationTransformer (
386
+ CreateKqpTypeAnnotationTransformer (Cluster, sessionCtx->TablesPtr (), *typesCtx, Config),
387
+ *typesCtx),
388
+ *typesCtx,
389
+ Config),
390
+ " BuildPhysicalTxs" )
391
+ .Build (false ));
392
+
333
393
auto physicalBuildQueryTransformer = TTransformationPipeline (typesCtx)
334
394
.AddServiceTransformers ()
335
395
.Add (Log (" PhysicalBuildQuery" ), " LogPhysicalBuildQuery" )
@@ -339,6 +399,15 @@ class TKqpRunner : public IKqpRunner {
339
399
.Add (CreateKqpStatisticsTransformer (OptimizeCtx, *typesCtx, Config, Pctx), " Statistics" )
340
400
.Build (false );
341
401
402
+ auto newRBOPhysicalBuildQueryTransformer = TTransformationPipeline (typesCtx)
403
+ .AddServiceTransformers ()
404
+ .Add (Log (" PhysicalBuildQuery" ), " LogPhysicalBuildQuery" )
405
+ .AddTypeAnnotationTransformer (CreateKqpTypeAnnotationTransformer (Cluster, sessionCtx->TablesPtr (), *typesCtx, Config))
406
+ .AddPostTypeAnnotation ()
407
+ .Add (CreateKqpBuildPhysicalQueryTransformer (OptimizeCtx, BuildQueryCtx), " BuildPhysicalQuery" )
408
+ .Add (CreateKqpStatisticsTransformer (OptimizeCtx, *typesCtx, Config, Pctx), " Statistics" )
409
+ .Build (false );
410
+
342
411
auto physicalPeepholeTransformer = TTransformationPipeline (typesCtx)
343
412
.AddServiceTransformers ()
344
413
.Add (Log (" PhysicalPeephole" ), " LogPhysicalPeephole" )
@@ -352,13 +421,50 @@ class TKqpRunner : public IKqpRunner {
352
421
*typesCtx), *typesCtx, Config), " Peephole" )
353
422
.Build (false );
354
423
424
+ auto newRBOPhysicalPeepholeTransformer = TTransformationPipeline (typesCtx)
425
+ .AddServiceTransformers ()
426
+ .Add (Log (" PhysicalPeephole" ), " LogPhysicalPeephole" )
427
+ .AddTypeAnnotationTransformer (CreateKqpTypeAnnotationTransformer (Cluster, sessionCtx->TablesPtr (), *typesCtx, Config))
428
+ .AddPostTypeAnnotation ()
429
+ .Add (GetDqIntegrationPeepholeTransformer (false , typesCtx), " DqIntegrationPeephole" )
430
+ .Add (
431
+ CreateKqpTxsPeepholeTransformer (
432
+ CreateTypeAnnotationTransformer (
433
+ CreateKqpTypeAnnotationTransformer (Cluster, sessionCtx->TablesPtr (), *typesCtx, Config),
434
+ *typesCtx), *typesCtx, Config), " Peephole" )
435
+ .Build (false );
436
+
355
437
TAutoPtr<IGraphTransformer> compilePhysicalQuery (new TCompilePhysicalQueryTransformer (Cluster,
356
438
*TransformCtx,
357
439
*OptimizeCtx,
358
440
*typesCtx,
359
441
funcRegistry,
360
442
Config));
361
443
444
+ TAutoPtr<IGraphTransformer> newRBOCompilePhysicalQuery (new TCompilePhysicalQueryTransformer (Cluster,
445
+ *TransformCtx,
446
+ *OptimizeCtx,
447
+ *typesCtx,
448
+ funcRegistry,
449
+ Config));
450
+
451
+ NewRBOTransformer = CreateCompositeGraphTransformer (
452
+ {
453
+ TTransformStage{ newRBOPhysicalOptimizeTransformer, " NewRBOPhysicalOptimize" , TIssuesIds::DEFAULT_ERROR },
454
+ LogStage (" NewRBOPhysicalOptimize" ),
455
+ TTransformStage{ newRBOPhysicalBuildTxsTransformer, " NewRBOPhysicalBuildTxs" , TIssuesIds::DEFAULT_ERROR },
456
+ LogStage (" NewRBOPhysicalBuildTxs" ),
457
+ TTransformStage{ newRBOPhysicalBuildQueryTransformer, " NewRBOPhysicalBuildQuery" , TIssuesIds::DEFAULT_ERROR },
458
+ LogStage (" NewRBOPhysicalBuildQuery" ),
459
+ TTransformStage{ CreateSaveExplainTransformerInput (*TransformCtx), " NewRBOSaveExplainTransformerInput" , TIssuesIds::DEFAULT_ERROR },
460
+ TTransformStage{ newRBOPhysicalPeepholeTransformer, " NewRBOPhysicalPeephole" , TIssuesIds::DEFAULT_ERROR },
461
+ LogStage (" NewRBOPhysicalPeephole" ),
462
+ TTransformStage{ newRBOCompilePhysicalQuery, " CompilePhysicalQuery" , TIssuesIds::DEFAULT_ERROR },
463
+ TTransformStage{ newRBOPreparedExplainTransformer, " NewRBOExplainQuery" , TIssuesIds::DEFAULT_ERROR }, // TODO(sk): only on stats mode or if explain-only
464
+ },
465
+ false
466
+ );
467
+
362
468
Transformer = CreateCompositeGraphTransformer (
363
469
{
364
470
TTransformStage{ physicalOptimizeTransformer, " PhysicalOptimize" , TIssuesIds::DEFAULT_ERROR },
@@ -404,6 +510,7 @@ class TKqpRunner : public IKqpRunner {
404
510
TKqpProviderContext Pctx;
405
511
406
512
TAutoPtr<IGraphTransformer> Transformer;
513
+ TAutoPtr<IGraphTransformer> NewRBOTransformer;
407
514
408
515
TActorSystem* ActorSystem;
409
516
};
0 commit comments