Skip to content

Commit e1eaa1b

Browse files
authored
Tpcc import imrovements (#17333) (#19791)
1 parent 4d5b4d5 commit e1eaa1b

File tree

3 files changed

+62
-21
lines changed

3 files changed

+62
-21
lines changed

ydb/library/workload/tpcc/import.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -511,15 +511,28 @@ NTable::TBulkUpsertResult LoadOrderLines(
511511
template<typename LoadFunc>
512512
void ExecuteWithRetry(const TString& operationName, LoadFunc loadFunc, TLog* Log) {
513513
for (int retryCount = 0; retryCount < MAX_RETRIES; ++retryCount) {
514+
if (GetGlobalInterruptSource().stop_requested()) {
515+
break;
516+
}
517+
514518
auto result = loadFunc();
515519
if (result.IsSuccess()) {
516520
return;
517521
}
518522

523+
const auto status = result.GetStatus();
524+
bool shouldFail = status == EStatus::NOT_FOUND || status == EStatus::UNDETERMINED
525+
|| status == EStatus::UNAUTHORIZED || status == EStatus::SCHEME_ERROR;
526+
if (shouldFail) {
527+
LOG_E(operationName << " failed: " << result.GetIssues().ToOneLineString());
528+
RequestStop();
529+
return;
530+
}
531+
519532
if (retryCount < MAX_RETRIES - 1) {
520533
int waitMs = GetBackoffWaitMs(retryCount);
521534
LOG_T("Retrying " << operationName << " after " << waitMs << " ms due to: "
522-
<< result.GetIssues().ToOneLineString());
535+
<< result.GetStatus() << ", " << result.GetIssues().ToOneLineString());
523536
Sleep(TDuration::MilliSeconds(waitMs));
524537
} else {
525538
LOG_E(operationName << " failed after " << MAX_RETRIES << " retries: "
@@ -757,10 +770,8 @@ std::expected<double, std::string> GetIndexProgress(
757770

758771
//-----------------------------------------------------------------------------
759772

760-
std::stop_source StopByInterrupt;
761-
762773
void InterruptHandler(int) {
763-
StopByInterrupt.request_stop();
774+
GetGlobalInterruptSource().request_stop();
764775
}
765776

766777
//-----------------------------------------------------------------------------
@@ -787,7 +798,7 @@ class TPCCLoader {
787798
, Log(std::make_unique<TLog>(THolder(static_cast<TLogBackend*>(LogBackend))))
788799
, PreviousDataSizeLoaded(0)
789800
, StartTime(Clock::now())
790-
, LoadState(StopByInterrupt.get_token())
801+
, LoadState(GetGlobalInterruptSource().get_token())
791802
{
792803
}
793804

@@ -812,18 +823,17 @@ class TPCCLoader {
812823
Config.LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT;
813824
}
814825

815-
// in particular this log message
816-
LOG_I("Starting TPC-C data import for " << Config.WarehouseCount << " warehouses using " <<
817-
Config.LoadThreadCount << " threads. Approximate data size: "
818-
<< GetFormattedSize(LoadState.ApproximateDataSize));
819-
820826
// TODO: detect number of threads
821827
size_t threadCount = std::min(Config.WarehouseCount, Config.LoadThreadCount);
822828
threadCount = std::max(threadCount, size_t(1));
823829

824830
// TODO: calculate optimal number of drivers (but per thread looks good)
825831
size_t driverCount = threadCount;
826832

833+
LOG_I("Starting TPC-C data import for " << Config.WarehouseCount << " warehouses using " <<
834+
threadCount << " threads and " << driverCount << " YDB drivers. Approximate data size: "
835+
<< GetFormattedSize(LoadState.ApproximateDataSize));
836+
827837
std::vector<TDriver> drivers;
828838
drivers.reserve(driverCount);
829839
for (size_t i = 0; i < driverCount; ++i) {
@@ -849,6 +859,8 @@ class TPCCLoader {
849859
auto& driver = drivers[threadId % driverCount];
850860
if (threadId == 0) {
851861
LoadSmallTables(driver, Config.Path, Config.WarehouseCount, Log.get());
862+
} else {
863+
std::this_thread::sleep_for(std::chrono::milliseconds(threadId));
852864
}
853865
LoadRange(driver, Config.Path, whStart, whEnd, LoadState, Log.get());
854866
});
@@ -859,7 +871,7 @@ class TPCCLoader {
859871
Clock::time_point lastIndexProgressCheck = Clock::time_point::min();
860872

861873
while (true) {
862-
if (StopByInterrupt.stop_requested()) {
874+
if (GetGlobalInterruptSource().stop_requested()) {
863875
break;
864876
}
865877

@@ -876,6 +888,16 @@ class TPCCLoader {
876888
size_t indexedRangesLoaded = LoadState.IndexedRangesLoaded.load(std::memory_order_relaxed);
877889
if (indexedRangesLoaded >= threadCount) {
878890
CreateIndices(drivers[0], Config.Path, LoadState, Log.get());
891+
for (const auto& state: LoadState.IndexBuildStates) {
892+
if (state.Id.GetKind() == TOperation::TOperationId::UNUSED) {
893+
GetGlobalInterruptSource().request_stop();
894+
break;
895+
}
896+
}
897+
if (GetGlobalInterruptSource().stop_requested()) {
898+
break;
899+
}
900+
879901
LOG_I("Indexed tables loaded, indices are being built in background. Continuing with remaining tables");
880902
LoadState.State = TLoadState::ELOAD_TABLES_BUILD_INDICES;
881903
lastIndexProgressCheck = now;
@@ -932,7 +954,7 @@ class TPCCLoader {
932954
ExitTuiMode();
933955
}
934956

935-
if (StopByInterrupt.stop_requested()) {
957+
if (GetGlobalInterruptSource().stop_requested()) {
936958
LOG_I("Stop requested, waiting for threads to finish");
937959
}
938960

ydb/library/workload/tpcc/ut/data_splitter_ut.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ Y_UNIT_TEST_SUITE(TDataSplitterTest) {
6363
int itemsPerShard = ITEM_COUNT / minShardCount;
6464
itemsPerShard = std::max(minItemsPerShard, itemsPerShard);
6565
int expectedItemSplits = (ITEM_COUNT - 1) / itemsPerShard;
66+
UNIT_ASSERT(expectedItemSplits >= 0);
6667
UNIT_ASSERT_VALUES_EQUAL(itemSplits.size(), expectedItemSplits);
6768

6869
// Heavy tables - check based on PER_WAREHOUSE_MB
@@ -73,6 +74,7 @@ Y_UNIT_TEST_SUITE(TDataSplitterTest) {
7374
int stockWarehousesPerShard2 = (1000 + minShardCount - 1) / minShardCount;
7475
stockWarehousesPerShard = std::min(stockWarehousesPerShard, stockWarehousesPerShard2);
7576
int expectedStockSplits = (1000 - 1) / stockWarehousesPerShard;
77+
UNIT_ASSERT(expectedStockSplits > 0);
7678
UNIT_ASSERT_VALUES_EQUAL(stockSplits.size(), expectedStockSplits);
7779
if (!stockSplits.empty()) {
7880
UNIT_ASSERT_VALUES_EQUAL(stockSplits[0], 1 + stockWarehousesPerShard);
@@ -85,12 +87,14 @@ Y_UNIT_TEST_SUITE(TDataSplitterTest) {
8587
int customerWarehousesPerShard2 = (1000 + minShardCount - 1) / minShardCount;
8688
customerWarehousesPerShard = std::min(customerWarehousesPerShard, customerWarehousesPerShard2);
8789
int expectedCustomerSplits = (1000 - 1) / customerWarehousesPerShard;
90+
UNIT_ASSERT(expectedCustomerSplits > 0);
8891
UNIT_ASSERT_VALUES_EQUAL(customerSplits.size(), expectedCustomerSplits);
8992

9093
// Light tables
9194
auto warehouseSplits = splitter.GetSplitKeys(TABLE_WAREHOUSE);
9295
int lightWarehousesPerShard = (1000 + minShardCount - 1) / minShardCount;
9396
int expectedLightSplits = (1000 - 1) / lightWarehousesPerShard;
97+
UNIT_ASSERT(expectedLightSplits > 0);
9498
UNIT_ASSERT_VALUES_EQUAL(warehouseSplits.size(), expectedLightSplits);
9599
}
96100

@@ -336,4 +340,4 @@ Y_UNIT_TEST_SUITE(TDataSplitterTest) {
336340
}
337341
}
338342
}
339-
}
343+
}

ydb/public/lib/ydb_cli/commands/ydb_workload_tpcc.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class TCommandTPCCClean
2323
TCommandTPCCClean(std::shared_ptr<NTPCC::TRunConfig> runConfig);
2424
~TCommandTPCCClean() = default;
2525

26-
virtual int Run(TConfig& config) override;
26+
int Run(TConfig& config) override;
2727

2828
private:
2929
std::shared_ptr<NTPCC::TRunConfig> RunConfig;
@@ -50,7 +50,8 @@ class TCommandTPCCInit
5050
TCommandTPCCInit(std::shared_ptr<NTPCC::TRunConfig> runConfig);
5151
~TCommandTPCCInit() = default;
5252

53-
virtual int Run(TConfig& config) override;
53+
void Config(TConfig& config) override;
54+
int Run(TConfig& config) override;
5455

5556
private:
5657
std::shared_ptr<NTPCC::TRunConfig> RunConfig;
@@ -62,6 +63,20 @@ TCommandTPCCInit::TCommandTPCCInit(std::shared_ptr<NTPCC::TRunConfig> runConfig)
6263
{
6364
}
6465

66+
void TCommandTPCCInit::Config(TConfig& config) {
67+
TYdbCommand::Config(config);
68+
69+
config.Opts->AddLongOption(
70+
'w', "warehouses", TStringBuilder() << "Number of warehouses")
71+
.RequiredArgument("INT").Required().StoreResult(&RunConfig->WarehouseCount);
72+
73+
config.Opts->AddLongOption(
74+
"log-level", TStringBuilder() << "Log level from 0 to 8, default is 6 (INFO)")
75+
.Optional().StoreMappedResult(&RunConfig->LogPriority, [](const TString& v) {
76+
return FromString<ELogPriority>(v);
77+
}).DefaultValue(RunConfig->LogPriority).Hidden();
78+
}
79+
6580
int TCommandTPCCInit::Run(TConfig& connectionConfig) {
6681
RunConfig->SetFullPath(connectionConfig);
6782
NTPCC::InitSync(connectionConfig, *RunConfig);
@@ -77,8 +92,8 @@ class TCommandTPCCImport
7792
TCommandTPCCImport(std::shared_ptr<NTPCC::TRunConfig> runConfig);
7893
~TCommandTPCCImport() = default;
7994

80-
virtual void Config(TConfig& config) override;
81-
virtual int Run(TConfig& config) override;
95+
void Config(TConfig& config) override;
96+
int Run(TConfig& config) override;
8297

8398
private:
8499
std::shared_ptr<NTPCC::TRunConfig> RunConfig;
@@ -95,7 +110,7 @@ void TCommandTPCCImport::Config(TConfig& config) {
95110

96111
config.Opts->AddLongOption(
97112
'w', "warehouses", TStringBuilder() << "Number of warehouses")
98-
.OptionalArgument("INT").StoreResult(&RunConfig->WarehouseCount).DefaultValue(RunConfig->WarehouseCount);
113+
.RequiredArgument("INT").Required().StoreResult(&RunConfig->WarehouseCount);
99114

100115
// TODO: detect automatically
101116
config.Opts->AddLongOption(
@@ -140,8 +155,8 @@ class TCommandTPCCRun
140155
TCommandTPCCRun(std::shared_ptr<NTPCC::TRunConfig> runConfig);
141156
~TCommandTPCCRun() = default;
142157

143-
virtual void Config(TConfig& config) override;
144-
virtual int Run(TConfig& config) override;
158+
void Config(TConfig& config) override;
159+
int Run(TConfig& config) override;
145160

146161
private:
147162
std::shared_ptr<NTPCC::TRunConfig> RunConfig;
@@ -158,7 +173,7 @@ void TCommandTPCCRun::Config(TConfig& config) {
158173

159174
config.Opts->AddLongOption(
160175
'w', "warehouses", TStringBuilder() << "Number of warehouses")
161-
.OptionalArgument("INT").StoreResult(&RunConfig->WarehouseCount).DefaultValue(RunConfig->WarehouseCount);
176+
.RequiredArgument("INT").Required().StoreResult(&RunConfig->WarehouseCount);
162177

163178
// TODO: default value should be auto
164179
config.Opts->AddLongOption(

0 commit comments

Comments
 (0)