Skip to content

Commit fdb6b91

Browse files
committed
Add a new GetNewestUserDefinedTimestamp API
1 parent 29c6610 commit fdb6b91

File tree

13 files changed

+272
-10
lines changed

13 files changed

+272
-10
lines changed

db/db_impl/db_impl.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
#include "options/cf_options.h"
7575
#include "options/options_helper.h"
7676
#include "options/options_parser.h"
77+
#include "util/udt_util.h"
7778
#ifdef ROCKSDB_JEMALLOC
7879
#include "port/jemalloc_helper.h"
7980
#endif
@@ -1855,6 +1856,74 @@ Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
18551856
return Status::OK();
18561857
}
18571858

1859+
Status DBImpl::GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
1860+
std::string* newest_timestamp) {
1861+
if (newest_timestamp == nullptr) {
1862+
return Status::InvalidArgument("newest_timestamp is nullptr");
1863+
}
1864+
ColumnFamilyData* cfd = nullptr;
1865+
if (column_family == nullptr) {
1866+
cfd = default_cf_handle_->cfd();
1867+
} else {
1868+
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1869+
assert(cfh != nullptr);
1870+
cfd = cfh->cfd();
1871+
}
1872+
assert(cfd != nullptr && cfd->user_comparator() != nullptr);
1873+
if (cfd->user_comparator()->timestamp_size() == 0) {
1874+
return Status::InvalidArgument(
1875+
"Timestamp is not enabled in this column family");
1876+
}
1877+
1878+
Status status;
1879+
// Acquire SuperVersion
1880+
SuperVersion* sv = GetAndRefSuperVersion(cfd);
1881+
{
1882+
InstrumentedMutexLock l(&mutex_);
1883+
bool enter_write_thread = sv->mem == cfd->mem();
1884+
WriteThread::Writer w;
1885+
// Enter write thread to read the mutable memtable to avoid racing access
1886+
// with concurrent writes. No need to enter nonmem_write_thread_ since this
1887+
// call only care about memtable writes, not WAL writes.
1888+
if (enter_write_thread) {
1889+
write_thread_.EnterUnbatched(&w, &mutex_);
1890+
WaitForPendingWrites();
1891+
}
1892+
*newest_timestamp = sv->mem->GetNewestUDT().ToString();
1893+
assert(!newest_timestamp->empty() || sv->mem->IsEmpty());
1894+
if (enter_write_thread) {
1895+
write_thread_.ExitUnbatched(&w);
1896+
}
1897+
}
1898+
// Read from immutable memtables if nothing found in mutable memtable.
1899+
if (newest_timestamp->empty()) {
1900+
*newest_timestamp = sv->imm->GetNewestUDT().ToString();
1901+
}
1902+
// Read from SST files if no result can be found in memtables.
1903+
if (newest_timestamp->empty() && sv->current->GetSstFilesSize() != 0) {
1904+
// If user defined timestamps are not persisted in SST files, then
1905+
// full_history_ts_low is used to track the exclusive upperbound of
1906+
// flushed user defined timestamp. So we can use it to deduce the newest
1907+
// timestamp in the SST files that the column family has seen.
1908+
if (!cfd->ioptions().persist_user_defined_timestamps) {
1909+
Slice full_history_ts_low = sv->full_history_ts_low;
1910+
if (!full_history_ts_low.empty()) {
1911+
GetU64CutoffTsFromFullHistoryTsLow(&full_history_ts_low,
1912+
newest_timestamp);
1913+
}
1914+
} else {
1915+
// There is currently no metadata in SST files or MANIFEST to track
1916+
// timestamps for this case.
1917+
// TODO(yuzhangyu): add support for this case.
1918+
status = Status::NotSupported(
1919+
"There is no timestamps found in memtables, "
1920+
"and user defined timestamps info not persisted in SST files.");
1921+
}
1922+
}
1923+
ReturnAndCleanupSuperVersion(cfd, sv);
1924+
return status;
1925+
}
1926+
18581927
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
18591928
Arena* arena,
18601929
SequenceNumber sequence,

db/db_impl/db_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,9 @@ class DBImpl : public DB {
502502
Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
503503
std::string* ts_low) override;
504504

505+
Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
506+
std::string* newest_timestamp) override;
507+
505508
Status GetDbIdentity(std::string& identity) const override;
506509

507510
virtual Status GetDbIdentityFromIdentityFile(const IOOptions& opts,

db/db_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3446,6 +3446,11 @@ class ModelDB : public DB {
34463446
return Status::OK();
34473447
}
34483448

3449+
Status GetNewestUserDefinedTimestamp(
3450+
ColumnFamilyHandle* /*cf*/, std::string* /*newest_timestamp*/) override {
3451+
return Status::OK();
3452+
}
3453+
34493454
ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; }
34503455

34513456
private:

db/db_with_timestamp_basic_test.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4914,6 +4914,132 @@ TEST_F(DBBasicTestWithTimestamp, TimestampFilterTableReadOnGet) {
49144914
Close();
49154915
}
49164916

4917+
// Test parameter: whether to persist timestamps in SST files.
4918+
class GetNewestUserDefinedTimestampTest
4919+
: public DBBasicTestWithTimestampBase,
4920+
public testing::WithParamInterface<bool> {
4921+
public:
4922+
explicit GetNewestUserDefinedTimestampTest()
4923+
: DBBasicTestWithTimestampBase("get_newest_udt_test") {}
4924+
};
4925+
4926+
INSTANTIATE_TEST_CASE_P(GetNewestUserDefinedTimestampTest,
4927+
GetNewestUserDefinedTimestampTest,
4928+
testing::Values(true, false));
4929+
4930+
TEST_P(GetNewestUserDefinedTimestampTest, Basic) {
4931+
Options options = CurrentOptions();
4932+
options.env = env_;
4933+
options.create_if_missing = true;
4934+
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
4935+
bool persist_udt = GetParam();
4936+
options.persist_user_defined_timestamps = persist_udt;
4937+
options.max_write_buffer_number = 5;
4938+
options.min_write_buffer_number_to_merge = 4;
4939+
options.allow_concurrent_memtable_write = false;
4940+
4941+
std::string newest_timestamp;
4942+
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
4943+
.IsInvalidArgument());
4944+
4945+
DestroyAndReopen(options);
4946+
ColumnFamilyHandleImpl* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
4947+
db_->DefaultColumnFamily());
4948+
ColumnFamilyData* cfd = cfh->cfd();
4949+
4950+
ASSERT_TRUE(
4951+
db_->GetNewestUserDefinedTimestamp(nullptr, nullptr).IsInvalidArgument());
4952+
4953+
// The column family hasn't seen any user defined timestamp
4954+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4955+
ASSERT_TRUE(newest_timestamp.empty());
4956+
4957+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(1), "val1"));
4958+
// Testing get newest timestamp from mutable memtable.
4959+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4960+
ASSERT_EQ(EncodeAsUint64(1), newest_timestamp);
4961+
4962+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(2), "val2"));
4963+
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfd));
4964+
// Testing get the newest timestamp from immutable memtable because the
4965+
// mutable one is empty.
4966+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4967+
ASSERT_EQ(EncodeAsUint64(2), newest_timestamp);
4968+
4969+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(3), "val3"));
4970+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(4), "val4"));
4971+
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfd));
4972+
// Testing get the newest timestamp from the more recent immutable memtable
4973+
// when there are multiple immutable memtables.
4974+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4975+
ASSERT_EQ(EncodeAsUint64(4), newest_timestamp);
4976+
4977+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(5), "val5"));
4978+
// Testing get newest timestamp from mutable memtable when it has data, in the
4979+
// presence of immutable memtables.
4980+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4981+
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);
4982+
4983+
ASSERT_OK(Flush());
4984+
// After flushing and all the user defined timestamp are flushed. User defined
4985+
// timestamp info is only available if timestamps are not persisted in SST
4986+
// files.
4987+
if (persist_udt) {
4988+
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
4989+
.IsNotSupported());
4990+
} else {
4991+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4992+
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);
4993+
}
4994+
4995+
Reopen(options);
4996+
// Similar after flush, when there is no memtables, but some SST files,
4997+
// if MANIFEST records the upperbound of flushed timestamps because timestamps
4998+
// are not persisted in SST files, this info can be found.
4999+
if (persist_udt) {
5000+
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
5001+
.IsNotSupported());
5002+
} else {
5003+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
5004+
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);
5005+
}
5006+
5007+
Close();
5008+
}
5009+
5010+
TEST_P(GetNewestUserDefinedTimestampTest, ConcurrentWrites) {
5011+
Options options = CurrentOptions();
5012+
options.create_if_missing = true;
5013+
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
5014+
bool persist_udt = GetParam();
5015+
options.persist_user_defined_timestamps = persist_udt;
5016+
options.allow_concurrent_memtable_write = false;
5017+
5018+
DestroyAndReopen(options);
5019+
5020+
std::vector<std::thread> threads;
5021+
threads.reserve(10);
5022+
std::atomic<uint64_t> current_ts{0};
5023+
for (int i = 0; i < 10; i++) {
5024+
threads.emplace_back([this, i, &current_ts]() {
5025+
if (i % 2 == 0) {
5026+
std::string newest_timestamp;
5027+
ASSERT_OK(
5028+
db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
5029+
} else {
5030+
uint64_t write_ts = current_ts.fetch_add(1);
5031+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(write_ts),
5032+
"val" + std::to_string(i)));
5033+
}
5034+
});
5035+
}
5036+
5037+
for (auto& t : threads) {
5038+
t.join();
5039+
}
5040+
Close();
5041+
}
5042+
49175043
} // namespace ROCKSDB_NAMESPACE
49185044

49195045
int main(int argc, char** argv) {

db/memtable.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
147147
const Comparator* ucmp = cmp.user_comparator();
148148
assert(ucmp);
149149
ts_sz_ = ucmp->timestamp_size();
150-
persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps;
151150
}
152151

153152
MemTable::~MemTable() {
@@ -1806,7 +1805,7 @@ uint64_t MemTable::GetMinLogContainingPrepSection() {
18061805
}
18071806

18081807
void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
1809-
if (ts_sz_ == 0 || persist_user_defined_timestamps_) {
1808+
if (ts_sz_ == 0) {
18101809
return;
18111810
}
18121811
const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
@@ -1817,9 +1816,7 @@ void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
18171816
}
18181817

18191818
const Slice& MemTable::GetNewestUDT() const {
1820-
// This path should not be invoked for MemTables that does not enable the UDT
1821-
// in Memtable only feature.
1822-
assert(ts_sz_ > 0 && !persist_user_defined_timestamps_);
1819+
assert(ts_sz_ > 0);
18231820
return newest_udt_;
18241821
}
18251822

db/memtable.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,8 @@ class MemTable final : public ReadOnlyMemTable {
825825
is_range_del_table_empty_;
826826
}
827827

828+
// Gets the newest user defined timestamps in the memtable. This should only
829+
// be called when user defined timestamp is enabled.
828830
const Slice& GetNewestUDT() const override;
829831

830832
// Returns Corruption status if verification fails.
@@ -900,14 +902,10 @@ class MemTable final : public ReadOnlyMemTable {
900902
// Size in bytes for the user-defined timestamps.
901903
size_t ts_sz_;
902904

903-
// Whether to persist user-defined timestamps
904-
bool persist_user_defined_timestamps_;
905-
906905
// Newest user-defined timestamp contained in this MemTable. For ts1, and ts2
907906
// if Comparator::CompareTimestamp(ts1, ts2) > 0, ts1 is considered newer than
908907
// ts2. We track this field for a MemTable if its column family has UDT
909-
// feature enabled and the `persist_user_defined_timestamp` flag is false.
910-
// Otherwise, this field just contains an empty Slice.
908+
// feature enabled.
911909
Slice newest_udt_;
912910

913911
// Updates flush_state_ using ShouldFlushNow()

db/memtable_list.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,19 @@ bool MemTableListVersion::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
374374
return ret;
375375
}
376376

377+
const Slice& MemTableListVersion::GetNewestUDT() const {
378+
static Slice kEmptySlice;
379+
for (auto it = memlist_.begin(); it != memlist_.end(); ++it) {
380+
ReadOnlyMemTable* m = *it;
381+
Slice timestamp = m->GetNewestUDT();
382+
assert(!timestamp.empty() || m->IsEmpty());
383+
if (!timestamp.empty()) {
384+
return m->GetNewestUDT();
385+
}
386+
}
387+
return kEmptySlice;
388+
}
389+
377390
// Returns true if there is at least one memtable on which flush has
378391
// not yet started.
379392
bool MemTableList::IsFlushPending() const {

db/memtable_list.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ class MemTableListVersion {
149149

150150
int NumFlushed() const { return static_cast<int>(memlist_history_.size()); }
151151

152+
// Gets the newest user defined timestamps from the immutable memtables.
153+
// This returns the newest user defined timestamp found in the most recent
154+
// immutable memtable. This should only be called when user defined timestamp
155+
// is enabled.
156+
const Slice& GetNewestUDT() const;
157+
152158
private:
153159
friend class MemTableList;
154160

include/rocksdb/db.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1782,6 +1782,27 @@ class DB {
17821782
virtual Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
17831783
std::string* ts_low) = 0;
17841784

1785+
// EXPERIMENTAL
1786+
// Get the newest timestamp of the column family. This is only for when the
1787+
// column family enables user defined timestamp.
1788+
// This checks the mutable memtable, the immutable memtable and the SST files,
1789+
// and returns the first newest user defined timestamp found.
1790+
// When user defined timestamp is not persisted in SST files, metadata in
1791+
// MANIFEST tracks the most recently seen timestamp for SST files, so the
1792+
// newest timestamp in SST files can be found. If user defined timestamps are
1793+
// persisted in SST files, currently no timestamp metadata info is persisted.
1794+
// A NotSupported status will be returned if SST files need to be checked in
1795+
// that case.
1796+
// OK status is returned if finding the newest timestamp succeeds, if
1797+
// `newest_timestamp` is empty, it means the column family hasn't seen any
1798+
// timestamp. The returned timestamp is encoded, util method `DecodeU64Ts` can
1799+
// be used to decode it into uint64_t.
1800+
// User-defined timestamp is required to be increasing per key, the return
1801+
// value of this API would be most useful if the user-defined timestamp is
1802+
// monotonically across keys.
1803+
virtual Status GetNewestUserDefinedTimestamp(
1804+
ColumnFamilyHandle* column_family, std::string* newest_timestamp) = 0;
1805+
17851806
// Suspend deleting obsolete files. Compactions will continue to occur,
17861807
// but no obsolete files will be deleted. To resume file deletions, each
17871808
// call to DisableFileDeletions() must be matched by a subsequent call to

include/rocksdb/utilities/stackable_db.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,11 @@ class StackableDB : public DB {
512512
return db_->GetFullHistoryTsLow(column_family, ts_low);
513513
}
514514

515+
Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
516+
std::string* newest_timestamp) override {
517+
return db_->GetNewestUserDefinedTimestamp(column_family, newest_timestamp);
518+
}
519+
515520
Status GetSortedWalFiles(VectorWalPtr& files) override {
516521
return db_->GetSortedWalFiles(files);
517522
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
A new API DB::GetNewestUserDefinedTimestamp is added to return the newest user defined timestamp seen in a column family

util/udt_util.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,20 @@ void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
429429
PutFixed64(full_history_ts_low, cutoff_udt_ts + 1);
430430
}
431431

432+
void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
433+
std::string* cutoff_ts) {
434+
uint64_t full_history_ts_low_int = 0;
435+
[[maybe_unused]] bool format_res =
436+
GetFixed64(full_history_ts_low, &full_history_ts_low_int);
437+
assert(format_res);
438+
assert(full_history_ts_low_int > 0);
439+
if (full_history_ts_low_int > 0) {
440+
PutFixed64(cutoff_ts, full_history_ts_low_int - 1);
441+
} else {
442+
PutFixed64(cutoff_ts, 0);
443+
}
444+
}
445+
432446
std::tuple<OptSlice, OptSlice> MaybeAddTimestampsToRange(
433447
const OptSlice& start, const OptSlice& end, size_t ts_sz,
434448
std::string* start_with_ts, std::string* end_with_ts, bool exclusive_end) {

util/udt_util.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ Status ValidateUserDefinedTimestampsOptions(
275275
void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
276276
std::string* full_history_ts_low);
277277

278+
// The reverse of `GetFullHistoryTsLowFromU64CutoffTs`.
279+
void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
280+
std::string* cutoff_ts);
281+
278282
// `start` is the inclusive lower user key bound without user-defined timestamp.
279283
// `end` is the upper user key bound without user-defined timestamp.
280284
// By default, `end` is treated as being exclusive. If `exclusive_end` is set to

0 commit comments

Comments
 (0)