@@ -51,7 +51,8 @@ ObDASFuncDataIter::ObDASFuncDataIter()
51
51
main_lookup_ls_id_(0 ),
52
52
main_lookup_param_(),
53
53
merge_memctx_(),
54
- doc_ids_()
54
+ doc_ids_(),
55
+ read_count_(0 )
55
56
{}
56
57
57
58
ObDASFuncDataIter::~ObDASFuncDataIter ()
@@ -158,7 +159,7 @@ int ObDASFuncDataIter::inner_reuse()
158
159
int ret = OB_SUCCESS;
159
160
doc_ids_.reuse ();
160
161
read_count_ = 0 ;
161
- if (main_lookup_iter_) {
162
+ if (OB_NOT_NULL ( main_lookup_iter_) ) {
162
163
ObDASScanIter *main_lookup_iter = static_cast <ObDASScanIter *>(main_lookup_iter_);
163
164
storage::ObTableScanParam &main_lookup_scan_param = main_lookup_iter->get_scan_param ();
164
165
if (OB_UNLIKELY (&main_lookup_param_ != &main_lookup_iter->get_scan_param ())) {
@@ -204,7 +205,7 @@ int ObDASFuncDataIter::inner_release()
204
205
DESTROY_CONTEXT (merge_memctx_);
205
206
merge_memctx_ = nullptr ;
206
207
}
207
- if (main_lookup_iter_) {
208
+ if (OB_NOT_NULL ( main_lookup_iter_) ) {
208
209
main_lookup_iter_ = nullptr ;
209
210
}
210
211
for (int64_t i = 0 ; i < iter_count_; i++) {
@@ -265,35 +266,28 @@ int ObDASFuncDataIter::inner_get_next_rows(int64_t &count, int64_t capacity)
265
266
if (OB_ISNULL (tr_merge_iters_)) {
266
267
ret = OB_ERR_UNEXPECTED;
267
268
LOG_WARN (" unexpected error, tr merge iter is nullptr" , K (ret));
268
- } else if (main_lookup_iter_) {
269
- int64_t storage_count = 0 ;
270
- while (OB_SUCC (ret) && main_lookup_count < capacity) {
271
- int64_t need_capacity = capacity - main_lookup_count;
272
- if (OB_FAIL (main_lookup_iter_->get_next_rows (storage_count, need_capacity))) {
269
+ } else if (OB_NOT_NULL (main_lookup_iter_)) {
270
+ while (OB_SUCC (ret) && main_lookup_count == 0 ) {
271
+ if (OB_FAIL (main_lookup_iter_->get_next_rows (main_lookup_count, capacity))) {
273
272
if (OB_ITER_END != ret) {
274
273
LOG_WARN (" fail to get next row for main lookup table" , K (ret), KPC (main_lookup_iter_));
275
- } else if (storage_count > 0 ) {
276
- main_lookup_count += storage_count;
277
274
}
278
- } else {
279
- main_lookup_count += storage_count;
280
275
}
281
276
}
282
277
if (OB_ITER_END == ret) {
283
278
ret = OB_SUCCESS;
284
279
}
285
280
}
286
- if (OB_SUCC (ret) && OB_UNLIKELY (main_lookup_iter_ &&
287
- main_lookup_count != capacity && // case: limit, read once
288
- default_size != main_lookup_count + read_count_)) { // case: limit, read more times
281
+ if (OB_SUCC (ret) && OB_UNLIKELY (main_lookup_iter_ && default_size < main_lookup_count + read_count_)) {
289
282
ret = OB_ERR_UNEXPECTED;
290
283
LOG_WARN (" unexpected error, main lookup count is not equal to capacity" , K (ret), K (default_size), K (main_lookup_count));
291
284
}
292
285
293
286
int tmp_count = 0 ;
287
+ int tr_merge_capacity = main_lookup_count != 0 ? OB_MIN (capacity, main_lookup_count) : capacity;
294
288
for (int64_t i = 0 ; OB_SUCC (ret) && i < iter_count_; i++) {
295
289
tr_merge_count = 0 ;
296
- if (OB_FAIL (tr_merge_iters_[i]->get_next_rows (tr_merge_count, capacity ))) {
290
+ if (OB_FAIL (tr_merge_iters_[i]->get_next_rows (tr_merge_count, tr_merge_capacity ))) {
297
291
if (OB_ITER_END != ret) {
298
292
LOG_WARN (" fail to get next rows for tr merge iter" , K (ret), K (i), KPC (tr_merge_iters_[i]));
299
293
} else {
@@ -305,7 +299,7 @@ int ObDASFuncDataIter::inner_get_next_rows(int64_t &count, int64_t capacity)
305
299
ret = OB_ERR_UNEXPECTED;
306
300
LOG_WARN (" unexpected error, tr merge count is not equal to tmp count" , K (ret), K (tr_merge_count), K (tmp_count), K (i));
307
301
} else if (OB_UNLIKELY (0 != tr_merge_count &&
308
- tr_merge_count != capacity &&
302
+ tr_merge_count != tr_merge_capacity &&
309
303
tr_merge_count + read_count_ != default_size)) {
310
304
ret = OB_ERR_UNEXPECTED;
311
305
LOG_WARN (" unexpected error, tr merge count is not equal to capacity" ,
0 commit comments