Skip to content

Commit 7d0d345

Browse files
cbi42bingtao.yin
authored and
bingtao.yin
committed
Experimental API IngestWriteBatchWithIndex() (facebook#13550)
Summary: add support for ingesting a WriteBatchWithIndex into the DB with the new API `IngestWriteBatchWithIndex()`. This ingestion works similarly as `TransactionOptions::commit_bypass_memtable` where the WBWI will be ingested as an immutable memtable. Since this skips memtable writes, it improves the write performance when writing a large write batch into the DB. Currently this API only supports `disableWAL=true`. Support for WAL write will be in a follow up if needed. For a WBWI to be ingestable, we needed to call `SetTrackPerCFStat()` at WBWI creation. This PR removes this step for simpler usage and per CF stats will always be tracked in WBWI. `WBWIIteratorImpl::TestOutOfBound()` is optimized to offset the performance impact. Pull Request resolved: facebook#13550 Test Plan: - new unit test - stress test option ingest_wbwi_one_in and ran a few runs of `python3 ./tools/db_crashtest.py blackbox --enable_pipelined_write=0 --use_timed_put_one_in=0 --use_put_entity_one_in=0 --ingest_wbwi_one_in=10 --test_batches_snapshots=0 --enable_blob_files=0 --preserve_unverified_changes=1 --avoid_flush_during_recovery=1 --disable_wal=1 --inplace_update_support=0 --interval=40` Reviewed By: jowlyzhang Differential Revision: D73152223 Pulled By: cbi42 fbshipit-source-id: 339f8ed26ac5a798238870df3ba857ba1add759b
1 parent 538a88d commit 7d0d345

File tree

16 files changed

+247
-60
lines changed

16 files changed

+247
-60
lines changed

db/db_impl/db_impl.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@ class DBImpl : public DB {
256256
Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates,
257257
UserWriteCallback* user_write_cb) override;
258258

259+
Status IngestWriteBatchWithIndex(
260+
const WriteOptions& options,
261+
std::shared_ptr<WriteBatchWithIndex> wbwi) override;
262+
259263
using DB::Get;
260264
Status Get(const ReadOptions& _read_options,
261265
ColumnFamilyHandle* column_family, const Slice& key,
@@ -1531,11 +1535,11 @@ class DBImpl : public DB {
15311535
// ingests `wbwi` is done.
15321536
// @param memtable_updated Whether the same write that ingests wbwi has
15331537
// updated memtable. This is useful for determining whether to set bg
1534-
// error when IngestWBWI fails.
1535-
Status IngestWBWI(std::shared_ptr<WriteBatchWithIndex> wbwi,
1536-
const WBWIMemTable::SeqnoRange& assigned_seqno,
1537-
uint64_t min_prep_log, SequenceNumber last_seqno,
1538-
bool memtable_updated, bool ignore_missing_cf);
1538+
// error when IngestWBWIAsMemtable fails.
1539+
Status IngestWBWIAsMemtable(std::shared_ptr<WriteBatchWithIndex> wbwi,
1540+
const WBWIMemTable::SeqnoRange& assigned_seqno,
1541+
uint64_t min_prep_log, SequenceNumber last_seqno,
1542+
bool memtable_updated, bool ignore_missing_cf);
15391543

15401544
// If disable_memtable is set the application logic must guarantee that the
15411545
// batch will still be skipped from memtable during the recovery. An excption

db/db_impl/db_impl_write.cc

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,38 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
190190
return s;
191191
}
192192

193-
Status DBImpl::IngestWBWI(std::shared_ptr<WriteBatchWithIndex> wbwi,
194-
const WBWIMemTable::SeqnoRange& assigned_seqno,
195-
uint64_t min_prep_log,
196-
SequenceNumber last_seqno_after_ingest,
197-
bool memtable_updated, bool ignore_missing_cf) {
193+
Status DBImpl::IngestWriteBatchWithIndex(
194+
const WriteOptions& write_options,
195+
std::shared_ptr<WriteBatchWithIndex> wbwi) {
196+
if (!wbwi) {
197+
return Status::InvalidArgument("Batch is nullptr!");
198+
}
199+
if (!write_options.disableWAL) {
200+
return Status::NotSupported(
201+
"IngestWriteBatchWithIndex does not support disableWAL=true");
202+
}
203+
Status s;
204+
if (write_options.protection_bytes_per_key > 0) {
205+
s = WriteBatchInternal::UpdateProtectionInfo(
206+
wbwi->GetWriteBatch(), write_options.protection_bytes_per_key);
207+
}
208+
if (s.ok()) {
209+
WriteBatch dummy_empty_batch;
210+
s = WriteImpl(
211+
write_options, /*updates=*/&dummy_empty_batch, /*callback=*/nullptr,
212+
/*user_write_cb=*/nullptr, /*log_used=*/nullptr, /*log_ref=*/0,
213+
/*disable_memtable=*/false, /*seq_used=*/nullptr,
214+
/*batch_cnt=*/0, /*pre_release_callback=*/nullptr,
215+
/*post_memtable_callback=*/nullptr, /*wbwi=*/wbwi);
216+
}
217+
return s;
218+
}
219+
220+
Status DBImpl::IngestWBWIAsMemtable(
221+
std::shared_ptr<WriteBatchWithIndex> wbwi,
222+
const WBWIMemTable::SeqnoRange& assigned_seqno, uint64_t min_prep_log,
223+
SequenceNumber last_seqno_after_ingest, bool memtable_updated,
224+
bool ignore_missing_cf) {
198225
// Keys in new memtable have seqno > last_seqno_after_ingest >= keys in wbwi.
199226
assert(assigned_seqno.upper_bound <= last_seqno_after_ingest);
200227
// Keys in the current memtable have seqno <= LastSequence() < keys in wbwi.
@@ -436,9 +463,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
436463
return Status::NotSupported(
437464
"DeleteRange is not compatible with row cache.");
438465
}
466+
// Whether the WBWI is from transaction commit or a direct write
467+
// (IngestWriteBatchWithIndex())
468+
bool ingest_wbwi_for_commit = false;
439469
if (wbwi) {
440-
assert(log_ref > 0);
441-
// Used only in WriteCommittedTxn::CommitInternal() with no `callback`.
470+
if (my_batch->HasCommit()) {
471+
ingest_wbwi_for_commit = true;
472+
assert(log_ref);
473+
} else {
474+
// Only supports disableWAL for directly ingesting WBWI for now.
475+
assert(write_options.disableWAL);
476+
}
442477
assert(!callback);
443478
if (immutable_db_options_.unordered_write) {
444479
return Status::NotSupported(
@@ -448,6 +483,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
448483
return Status::NotSupported(
449484
"Ingesting WriteBatch does not support pipelined_write");
450485
}
486+
if (!wbwi->GetOverwriteKey()) {
487+
return Status::NotSupported(
488+
"WriteBatchWithIndex ingestion requires overwrite_key=true");
489+
}
451490
}
452491
// Otherwise IsLatestPersistentState optimization does not make sense
453492
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
@@ -658,7 +697,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
658697
continue;
659698
}
660699
// TODO: maybe handle the tracing status?
661-
tracer_->Write(writer->batch).PermitUncheckedError();
700+
if (wbwi && !ingest_wbwi_for_commit) {
701+
// for transaction write, tracer only needs the commit marker which
702+
// is in writer->batch
703+
tracer_->Write(wbwi->GetWriteBatch()).PermitUncheckedError();
704+
} else {
705+
tracer_->Write(writer->batch).PermitUncheckedError();
706+
}
662707
}
663708
}
664709
}
@@ -860,12 +905,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
860905
// handle exit, false means somebody else did
861906
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
862907
}
863-
if (wbwi) {
864-
if (status.ok() && w.status.ok()) {
908+
if (wbwi && status.ok() && w.status.ok()) {
909+
uint32_t wbwi_count = wbwi->GetWriteBatch()->Count();
910+
// skip empty batch case
911+
if (wbwi_count) {
865912
// w.batch contains (potentially empty) commit time batch updates,
866913
// only ingest wbwi if w.batch is applied to memtable successfully
867914
uint32_t memtable_update_count = w.batch->Count();
868-
uint32_t wbwi_count = wbwi->GetWriteBatch()->Count();
869915
// Seqno assigned to this write are [last_seq + 1 - seq_inc, last_seq].
870916
// seq_inc includes w.batch (memtable updates) and wbwi
871917
// w.batch gets first `memtable_update_count` sequence numbers.
@@ -878,10 +924,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
878924
if (two_write_queues_) {
879925
assert(ub <= versions_->LastAllocatedSequence());
880926
}
881-
status = IngestWBWI(wbwi, {/*lower_bound=*/lb, /*upper_bound=*/ub},
882-
/*min_prep_log=*/log_ref, last_sequence,
883-
/*memtable_updated=*/memtable_update_count > 0,
884-
write_options.ignore_missing_column_families);
927+
status =
928+
IngestWBWIAsMemtable(wbwi, {/*lower_bound=*/lb, /*upper_bound=*/ub},
929+
/*min_prep_log=*/log_ref, last_sequence,
930+
/*memtable_updated=*/memtable_update_count > 0,
931+
write_options.ignore_missing_column_families);
885932
}
886933
}
887934

db/db_write_test.cc

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,80 @@ TEST_P(DBWriteTest, RecycleLogToggleTest) {
10001000
ASSERT_EQ(Get(Key(1)), "val2");
10011001
}
10021002

1003+
TEST_P(DBWriteTest, IngestWriteBatchWithIndex) {
1004+
if (GetParam() == kPipelinedWrite) {
1005+
return;
1006+
}
1007+
1008+
Options options = GetOptions();
1009+
options.disable_auto_compactions = true;
1010+
Reopen(options);
1011+
Options cf_options = GetOptions();
1012+
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
1013+
CreateColumnFamilies({"cf1", "cf2"}, cf_options);
1014+
ReopenWithColumnFamilies({"default", "cf1", "cf2"},
1015+
{options, cf_options, cf_options});
1016+
1017+
// default cf
1018+
auto wbwi1 = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
1019+
/*overwrite_key=*/true);
1020+
ASSERT_OK(wbwi1->Put("key1", "value1"));
1021+
ASSERT_OK(wbwi1->Put("key2", "value2"));
1022+
if (GetParam() == kPipelinedWrite) {
1023+
ASSERT_TRUE(db_->IngestWriteBatchWithIndex({}, wbwi1).IsNotSupported());
1024+
return;
1025+
}
1026+
// Test disableWAL=false
1027+
ASSERT_TRUE(db_->IngestWriteBatchWithIndex({}, wbwi1).IsNotSupported());
1028+
1029+
WriteOptions wo;
1030+
wo.disableWAL = true;
1031+
ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi1));
1032+
ASSERT_EQ("value1", Get("key1"));
1033+
ASSERT_EQ("value2", Get("key2"));
1034+
1035+
// Test with overwrites
1036+
auto wbwi = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
1037+
/*overwrite_key=*/true);
1038+
ASSERT_OK(wbwi->Put("key2", "value3"));
1039+
ASSERT_OK(wbwi->Delete("key1")); // Delete an existing key
1040+
ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi));
1041+
ASSERT_EQ("NOT_FOUND", Get("key1"));
1042+
ASSERT_EQ("value3", Get("key2"));
1043+
1044+
auto wbwi2 = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
1045+
/*overwrite_key=*/true);
1046+
ASSERT_OK(wbwi2->Put(handles_[1], "cf1_key1", "cf1_value1"));
1047+
ASSERT_OK(wbwi2->Delete(handles_[1], "cf1_key2"));
1048+
// Test ingestion with column family
1049+
ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi2));
1050+
ASSERT_EQ("cf1_value1", Get(1, "cf1_key1"));
1051+
ASSERT_EQ("NOT_FOUND", Get(1, "cf1_key2"));
1052+
1053+
auto wbwi3 = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
1054+
/*overwrite_key=*/true);
1055+
ASSERT_OK(wbwi3->Merge(handles_[2], "cf2_key1", "cf2_value1"));
1056+
ASSERT_OK(wbwi3->Merge(handles_[2], "cf2_key1", "cf2_value2"));
1057+
// Test ingestion with merge operations
1058+
ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi3));
1059+
ASSERT_EQ("cf2_value1,cf2_value2", Get(2, "cf2_key1"));
1060+
1061+
// Test with overwrite_key = false
1062+
auto wbwi_no_overwrite = std::make_shared<WriteBatchWithIndex>(
1063+
options.comparator, 0, /*overwrite_key=*/false);
1064+
ASSERT_OK(wbwi_no_overwrite->Put("key1", "value1"));
1065+
Status s = db_->IngestWriteBatchWithIndex(wo, wbwi_no_overwrite);
1066+
ASSERT_TRUE(s.IsNotSupported());
1067+
1068+
auto empty_wbwi = std::make_shared<WriteBatchWithIndex>(
1069+
options.comparator, 0, /*overwrite_key=*/true);
1070+
ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, empty_wbwi));
1071+
1072+
DestroyAndReopen(options);
1073+
// Should fail when trying to ingest to non-existent column family
1074+
ASSERT_NOK(db_->IngestWriteBatchWithIndex(wo, wbwi2));
1075+
}
1076+
10031077
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
10041078
testing::Values(DBTestBase::kDefault,
10051079
DBTestBase::kConcurrentWALWrites,

db_stress_tool/db_stress_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ DECLARE_bool(track_and_verify_wals);
423423
DECLARE_bool(enable_remote_compaction);
424424
DECLARE_bool(auto_refresh_iterator_with_snapshot);
425425
DECLARE_uint32(memtable_op_scan_flush_trigger);
426+
DECLARE_uint32(ingest_wbwi_one_in);
426427

427428
constexpr long KB = 1024;
428429
constexpr int kRandomValueMaxFactor = 3;

db_stress_tool/db_stress_gflags.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,11 @@ DEFINE_bool(track_and_verify_wals,
842842
DEFINE_bool(enable_remote_compaction, false,
843843
"Enable (simulated) Remote Compaction");
844844

845+
DEFINE_uint32(ingest_wbwi_one_in, 0,
846+
"If set, will call"
847+
"IngestWriteBatchWithIndex() instead of regular write operations "
848+
"once every N writes.");
849+
845850
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
846851
if (value < 0 || value > 100) {
847852
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", flagname,

db_stress_tool/no_batched_ops_stress.cc

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1845,7 +1845,17 @@ class NonBatchedOpsStressTest : public StressTest {
18451845
} else if (FLAGS_use_merge) {
18461846
if (!FLAGS_use_txn) {
18471847
if (FLAGS_user_timestamp_size == 0) {
1848-
s = db_->Merge(write_opts, cfh, k, v);
1848+
if (FLAGS_ingest_wbwi_one_in &&
1849+
thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
1850+
auto wbwi = std::make_shared<WriteBatchWithIndex>(
1851+
options_.comparator, 0, /*overwrite_key=*/true);
1852+
s = wbwi->Merge(cfh, k, v);
1853+
if (s.ok()) {
1854+
s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
1855+
}
1856+
} else {
1857+
s = db_->Merge(write_opts, cfh, k, v);
1858+
}
18491859
} else {
18501860
s = db_->Merge(write_opts, cfh, k, write_ts, v);
18511861
}
@@ -1857,7 +1867,17 @@ class NonBatchedOpsStressTest : public StressTest {
18571867
} else {
18581868
if (!FLAGS_use_txn) {
18591869
if (FLAGS_user_timestamp_size == 0) {
1860-
s = db_->Put(write_opts, cfh, k, v);
1870+
if (FLAGS_ingest_wbwi_one_in &&
1871+
thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
1872+
auto wbwi = std::make_shared<WriteBatchWithIndex>(
1873+
options_.comparator, 0, /*overwrite_key=*/true);
1874+
s = wbwi->Put(cfh, k, v);
1875+
if (s.ok()) {
1876+
s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
1877+
}
1878+
} else {
1879+
s = db_->Put(write_opts, cfh, k, v);
1880+
}
18611881
} else {
18621882
s = db_->Put(write_opts, cfh, k, write_ts, v);
18631883
}
@@ -1949,7 +1969,17 @@ class NonBatchedOpsStressTest : public StressTest {
19491969
}
19501970
if (!FLAGS_use_txn) {
19511971
if (FLAGS_user_timestamp_size == 0) {
1952-
s = db_->Delete(write_opts, cfh, key);
1972+
if (FLAGS_ingest_wbwi_one_in &&
1973+
thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
1974+
auto wbwi = std::make_shared<WriteBatchWithIndex>(
1975+
options_.comparator, 0, /*overwrite_key=*/true);
1976+
s = wbwi->Delete(cfh, key);
1977+
if (s.ok()) {
1978+
s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
1979+
}
1980+
} else {
1981+
s = db_->Delete(write_opts, cfh, key);
1982+
}
19531983
} else {
19541984
s = db_->Delete(write_opts, cfh, key, write_ts);
19551985
}
@@ -2006,7 +2036,17 @@ class NonBatchedOpsStressTest : public StressTest {
20062036
}
20072037
if (!FLAGS_use_txn) {
20082038
if (FLAGS_user_timestamp_size == 0) {
2009-
s = db_->SingleDelete(write_opts, cfh, key);
2039+
if (FLAGS_ingest_wbwi_one_in &&
2040+
thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
2041+
auto wbwi = std::make_shared<WriteBatchWithIndex>(
2042+
options_.comparator, 0, /*overwrite_key=*/true);
2043+
s = wbwi->SingleDelete(cfh, key);
2044+
if (s.ok()) {
2045+
s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
2046+
}
2047+
} else {
2048+
s = db_->SingleDelete(write_opts, cfh, key);
2049+
}
20102050
} else {
20112051
s = db_->SingleDelete(write_opts, cfh, key, write_ts);
20122052
}

include/rocksdb/db.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "rocksdb/types.h"
3232
#include "rocksdb/user_write_callback.h"
3333
#include "rocksdb/utilities/table_properties_collectors.h"
34+
#include "rocksdb/utilities/write_batch_with_index.h"
3435
#include "rocksdb/version.h"
3536
#include "rocksdb/wide_columns.h"
3637

@@ -633,6 +634,21 @@ class DB {
633634
"WriteWithCallback not implemented for this interface.");
634635
}
635636

637+
// EXPERIMENTAL, subject to change
638+
// Ingest a WriteBatchWithIndex into DB, bypassing memtable writes for better
639+
// write performance. Useful when there is a large number of updates
640+
// in the write batch.
641+
// The WriteBatchWithIndex must be created with overwrite_key=true.
642+
// Currently this requires WriteOptions::disableWAL=true.
643+
// The following options are currently not supported:
644+
// - unordered_write
645+
// - enable_pipelined_write
646+
virtual Status IngestWriteBatchWithIndex(
647+
const WriteOptions& /*options*/,
648+
std::shared_ptr<WriteBatchWithIndex> /*wbwi*/) {
649+
return Status::NotSupported("IngestWriteBatchWithIndex not implemented.");
650+
}
651+
636652
// If the column family specified by "column_family" contains an entry for
637653
// "key", return the corresponding value in "*value". If the entry is a plain
638654
// key-value, return the value as-is; if it is a wide-column entity, return

include/rocksdb/utilities/transaction_db.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,8 @@ struct TransactionOptions {
368368
// Only supports write-committed policy. If set to true, the transaction will
369369
// skip memtable write and ingest into the DB directly during Commit(). This
370370
// makes Commit() much faster for transactions with many operations.
371+
// Transaction neeeds to call Prepare() before Commit() for this option to
372+
// take effect.
371373
// Transactions with Merge() or PutEntity() is not supported yet.
372374
//
373375
// Note that the transaction will be ingested as an immutable memtable for

include/rocksdb/utilities/write_batch_with_index.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class WBWIIterator {
9090
// Returns n where the current entry is the n-th update to the current key.
9191
// The update count starts from 1.
9292
// Only valid if WBWI is created with overwrite_key = true.
93+
// With overwrite_key=false, update count for each entry is not maintained,
94+
// see UpdateExistingEntryWithCfId().
9395
virtual uint32_t GetUpdateCount() const { return 0; }
9496
};
9597

@@ -374,9 +376,6 @@ class WriteBatchWithIndex : public WriteBatchBase {
374376
uint32_t entry_count = 0;
375377
uint32_t overwritten_sd_count = 0;
376378
};
377-
// Will track CF ID, per CF entry count and overwritten sd count.
378-
// Should be enabled when WBWI is empty for correct tracking.
379-
void SetTrackPerCFStat(bool track);
380379
const std::unordered_map<uint32_t, CFStat>& GetCFStats() const;
381380

382381
bool GetOverwriteKey() const;

memtable/wbwi_memtable.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ class WBWIMemTable final : public ReadOnlyMemTable {
235235
uint64_t num_entries_;
236236
// WBWI can contains updates to multiple CFs. `cf_id_` determines which CF
237237
// this memtable is for.
238-
uint32_t cf_id_;
238+
const uint32_t cf_id_;
239239
};
240240

241241
class WBWIMemTableIterator final : public InternalIterator {

0 commit comments

Comments
 (0)