@@ -2774,7 +2774,6 @@ void abortPool(memory::MemoryPool* pool) {
2774
2774
} // namespace
2775
2775
2776
2776
DEBUG_ONLY_TEST_F (AggregationTest, abortDuringOutputProcessing) {
2777
- constexpr int64_t kMaxBytes = 1LL << 30 ; // 1GB
2778
2777
auto rowType = ROW ({" c0" , " c1" , " c2" }, {INTEGER (), INTEGER (), INTEGER ()});
2779
2778
auto batches = makeVectors (rowType, 1000 , 10 );
2780
2779
@@ -2792,77 +2791,53 @@ DEBUG_ONLY_TEST_F(AggregationTest, abortDuringOutputProcessing) {
2792
2791
2793
2792
for (const auto & testData : testSettings) {
2794
2793
SCOPED_TRACE (testData.debugString ());
2795
- auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get ());
2796
- queryCtx->testingOverrideMemoryPool (memory::memoryManager ()->addRootPool (
2797
- queryCtx->queryId (), kMaxBytes , memory::MemoryReclaimer::create ()));
2798
2794
auto expectedResult =
2799
2795
AssertQueryBuilder (
2800
2796
PlanBuilder ()
2801
2797
.values (batches)
2802
2798
.singleAggregation ({" c0" , " c1" }, {" array_agg(c2)" })
2803
2799
.planNode ())
2804
- .queryCtx (queryCtx)
2805
2800
.copyResults (pool_.get ());
2806
2801
2807
- folly::EventCount driverWait;
2808
- auto driverWaitKey = driverWait.prepareWait ();
2809
- folly::EventCount testWait;
2810
- auto testWaitKey = testWait.prepareWait ();
2811
-
2812
2802
std::atomic_bool injectOnce{true };
2813
- Operator* op;
2814
2803
SCOPED_TESTVALUE_SET (
2815
2804
" facebook::velox::exec::Driver::runInternal::noMoreInput" ,
2816
- std::function<void (Operator*)>(([&](Operator* testOp ) {
2817
- if (testOp ->operatorType () != " Aggregation" ) {
2805
+ std::function<void (Operator*)>(([&](Operator* op ) {
2806
+ if (op ->operatorType () != " Aggregation" ) {
2818
2807
return ;
2819
2808
}
2820
- op = testOp;
2821
2809
if (!injectOnce.exchange (false )) {
2822
2810
return ;
2823
2811
}
2824
2812
auto * driver = op->testingOperatorCtx ()->driver ();
2825
2813
ASSERT_EQ (
2826
2814
driver->task ()->enterSuspended (driver->state ()),
2827
2815
StopReason::kNone );
2828
- testWait.notify ();
2829
- driverWait.wait (driverWaitKey);
2816
+ testData.abortFromRootMemoryPool ? abortPool (op->pool ()->root ())
2817
+ : abortPool (op->pool ());
2818
+ // We can't directly reclaim memory from this hash build operator as
2819
+ // its driver thread is running and in suspension state.
2820
+ ASSERT_GT (op->pool ()->root ()->currentBytes (), 0 );
2830
2821
ASSERT_EQ (
2831
2822
driver->task ()->leaveSuspended (driver->state ()),
2832
2823
StopReason::kAlreadyTerminated );
2833
2824
VELOX_MEM_POOL_ABORTED (" Memory pool aborted" );
2834
2825
})));
2835
2826
2836
- std::thread taskThread ([&]() {
2837
- VELOX_ASSERT_THROW (
2838
- AssertQueryBuilder (
2839
- PlanBuilder ()
2840
- .values (batches)
2841
- .singleAggregation ({" c0" , " c1" }, {" array_agg(c2)" })
2842
- .planNode ())
2843
- .queryCtx (queryCtx)
2844
- .maxDrivers (testData.numDrivers )
2845
- .assertResults (expectedResult),
2846
- " " );
2847
- });
2848
-
2849
- testWait.wait (testWaitKey);
2850
- ASSERT_TRUE (op != nullptr );
2851
- auto task = op->testingOperatorCtx ()->task ();
2852
- testData.abortFromRootMemoryPool ? abortPool (queryCtx->pool ())
2853
- : abortPool (op->pool ());
2854
- ASSERT_TRUE (op->pool ()->aborted ());
2855
- ASSERT_TRUE (queryCtx->pool ()->aborted ());
2856
- ASSERT_EQ (queryCtx->pool ()->currentBytes (), 0 );
2857
- driverWait.notify ();
2858
- taskThread.join ();
2859
- task.reset ();
2827
+ VELOX_ASSERT_THROW (
2828
+ AssertQueryBuilder (
2829
+ PlanBuilder ()
2830
+ .values (batches)
2831
+ .singleAggregation ({" c0" , " c1" }, {" array_agg(c2)" })
2832
+ .planNode ())
2833
+ .maxDrivers (testData.numDrivers )
2834
+ .assertResults (expectedResult),
2835
+ " Memory pool manually aborted" );
2860
2836
waitForAllTasksToBeDeleted ();
2861
2837
}
2862
2838
}
2863
2839
2864
2840
DEBUG_ONLY_TEST_F (AggregationTest, abortDuringInputgProcessing) {
2865
- constexpr int64_t kMaxBytes = 1LL << 30 ; // 1GB
2866
2841
auto rowType = ROW ({" c0" , " c1" , " c2" }, {INTEGER (), INTEGER (), INTEGER ()});
2867
2842
auto batches = makeVectors (rowType, 1000 , 10 );
2868
2843
@@ -2880,72 +2855,50 @@ DEBUG_ONLY_TEST_F(AggregationTest, abortDuringInputgProcessing) {
2880
2855
2881
2856
for (const auto & testData : testSettings) {
2882
2857
SCOPED_TRACE (testData.debugString ());
2883
- auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get ());
2884
- queryCtx->testingOverrideMemoryPool (memory::memoryManager ()->addRootPool (
2885
- queryCtx->queryId (), kMaxBytes , memory::MemoryReclaimer::create ()));
2886
2858
auto expectedResult =
2887
2859
AssertQueryBuilder (
2888
2860
PlanBuilder ()
2889
2861
.values (batches)
2890
2862
.singleAggregation ({" c0" , " c1" }, {" array_agg(c2)" })
2891
2863
.planNode ())
2892
- .queryCtx (queryCtx)
2893
2864
.copyResults (pool_.get ());
2894
2865
2895
- folly::EventCount driverWait;
2896
- auto driverWaitKey = driverWait.prepareWait ();
2897
- folly::EventCount testWait;
2898
- auto testWaitKey = testWait.prepareWait ();
2899
-
2900
2866
std::atomic_int numInputs{0 };
2901
- Operator* op;
2902
2867
SCOPED_TESTVALUE_SET (
2903
2868
" facebook::velox::exec::Driver::runInternal::addInput" ,
2904
- std::function<void (Operator*)>(([&](Operator* testOp ) {
2905
- if (testOp ->operatorType () != " Aggregation" ) {
2869
+ std::function<void (Operator*)>(([&](Operator* op ) {
2870
+ if (op ->operatorType () != " Aggregation" ) {
2906
2871
return ;
2907
2872
}
2908
- op = testOp;
2909
- ++numInputs;
2910
- if (numInputs != 2 ) {
2873
+ if (++numInputs != 2 ) {
2911
2874
return ;
2912
2875
}
2913
2876
auto * driver = op->testingOperatorCtx ()->driver ();
2914
2877
ASSERT_EQ (
2915
2878
driver->task ()->enterSuspended (driver->state ()),
2916
2879
StopReason::kNone );
2917
- testWait.notify ();
2918
- driverWait.wait (driverWaitKey);
2880
+ testData.abortFromRootMemoryPool ? abortPool (op->pool ()->root ())
2881
+ : abortPool (op->pool ());
2882
+ // We can't directly reclaim memory from this hash build operator as
2883
+ // its driver thread is running and in suspension state.
2884
+ ASSERT_GT (op->pool ()->root ()->currentBytes (), 0 );
2919
2885
ASSERT_EQ (
2920
2886
driver->task ()->leaveSuspended (driver->state ()),
2921
2887
StopReason::kAlreadyTerminated );
2888
+ ASSERT_TRUE (op->pool ()->aborted ());
2889
+ ASSERT_TRUE (op->pool ()->root ()->aborted ());
2922
2890
VELOX_MEM_POOL_ABORTED (" Memory pool aborted" );
2923
2891
})));
2924
2892
2925
- std::thread taskThread ([&]() {
2926
- VELOX_ASSERT_THROW (
2927
- AssertQueryBuilder (
2928
- PlanBuilder ()
2929
- .values (batches)
2930
- .singleAggregation ({" c0" , " c1" }, {" array_agg(c2)" })
2931
- .planNode ())
2932
- .queryCtx (queryCtx)
2933
- .maxDrivers (testData.numDrivers )
2934
- .assertResults (expectedResult),
2935
- " " );
2936
- });
2937
-
2938
- testWait.wait (testWaitKey);
2939
- ASSERT_TRUE (op != nullptr );
2940
- auto task = op->testingOperatorCtx ()->task ();
2941
- testData.abortFromRootMemoryPool ? abortPool (queryCtx->pool ())
2942
- : abortPool (op->pool ());
2943
- ASSERT_TRUE (op->pool ()->aborted ());
2944
- ASSERT_TRUE (queryCtx->pool ()->aborted ());
2945
- ASSERT_EQ (queryCtx->pool ()->currentBytes (), 0 );
2946
- driverWait.notify ();
2947
- taskThread.join ();
2948
- task.reset ();
2893
+ VELOX_ASSERT_THROW (
2894
+ AssertQueryBuilder (
2895
+ PlanBuilder ()
2896
+ .values (batches)
2897
+ .singleAggregation ({" c0" , " c1" }, {" array_agg(c2)" })
2898
+ .planNode ())
2899
+ .maxDrivers (testData.numDrivers )
2900
+ .assertResults (expectedResult),
2901
+ " Memory pool manually aborted" );
2949
2902
waitForAllTasksToBeDeleted ();
2950
2903
}
2951
2904
}
0 commit comments