Skip to content

Commit 0c06eef

Browse files
jowlyzhangbingtao.yin
authored and
bingtao.yin
committed
Add a new GetNewestUserDefinedTimestamp API (facebook#13547)
Summary: This PR adds a DB::GetNewestUserDefinedTimestamp API to get the newest timestamp of the column family. This is only for when the column family enables user defined timestamp. It checks the mutable memtable, the immutable memtable and the SST files, and returns the first newest user defined timestamp found. When user defined timestamp is not persisted in SST files, there is metadata in MANIFEST tracking upperbound of flushed timestamps, so the newest timestamp in SST files can be found. If user defined timestamps are persisted in SST files, currently no timestamp metadata info is persisted. A NotSupported status will be returned if SST files need to be checked in that case. Pull Request resolved: facebook#13547 Test Plan: Added tests Reviewed By: cbi42 Differential Revision: D73123575 Pulled By: jowlyzhang fbshipit-source-id: 460ac4f9c96926d3c8fcf7944edab8dc0feae1dd
1 parent 7d0d345 commit 0c06eef

File tree

13 files changed

+250
-10
lines changed

13 files changed

+250
-10
lines changed

db/db_impl/db_impl.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
#include "options/cf_options.h"
7575
#include "options/options_helper.h"
7676
#include "options/options_parser.h"
77+
#include "util/udt_util.h"
7778
#ifdef ROCKSDB_JEMALLOC
7879
#include "port/jemalloc_helper.h"
7980
#endif
@@ -1855,6 +1856,69 @@ Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
18551856
return Status::OK();
18561857
}
18571858

1859+
Status DBImpl::GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
1860+
std::string* newest_timestamp) {
1861+
if (newest_timestamp == nullptr) {
1862+
return Status::InvalidArgument("newest_timestamp is nullptr");
1863+
}
1864+
ColumnFamilyData* cfd = nullptr;
1865+
if (column_family == nullptr) {
1866+
cfd = default_cf_handle_->cfd();
1867+
} else {
1868+
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1869+
assert(cfh != nullptr);
1870+
cfd = cfh->cfd();
1871+
}
1872+
assert(cfd != nullptr && cfd->user_comparator() != nullptr);
1873+
if (cfd->user_comparator()->timestamp_size() == 0) {
1874+
return Status::InvalidArgument(
1875+
"Timestamp is not enabled in this column family");
1876+
}
1877+
if (cfd->ioptions().persist_user_defined_timestamps) {
1878+
return Status::NotSupported(
1879+
"GetNewestUserDefinedTimestamp doesn't support the case when user"
1880+
"defined timestamps are persisted.");
1881+
}
1882+
1883+
Status status;
1884+
// Acquire SuperVersion
1885+
SuperVersion* sv = GetAndRefSuperVersion(cfd);
1886+
{
1887+
InstrumentedMutexLock l(&mutex_);
1888+
bool enter_write_thread = sv->mem == cfd->mem();
1889+
WriteThread::Writer w;
1890+
// Enter write thread to read the mutable memtable to avoid racing access
1891+
// with concurrent writes. No need to enter nonmem_write_thread_ since this
1892+
// call only care about memtable writes, not WAL writes.
1893+
if (enter_write_thread) {
1894+
write_thread_.EnterUnbatched(&w, &mutex_);
1895+
WaitForPendingWrites();
1896+
}
1897+
*newest_timestamp = sv->mem->GetNewestUDT().ToString();
1898+
assert(!newest_timestamp->empty() || sv->mem->IsEmpty());
1899+
if (enter_write_thread) {
1900+
write_thread_.ExitUnbatched(&w);
1901+
}
1902+
}
1903+
// Read from immutable memtables if nothing found in mutable memtable.
1904+
if (newest_timestamp->empty()) {
1905+
*newest_timestamp = sv->imm->GetNewestUDT().ToString();
1906+
}
1907+
// Read from SST files if no result can be found in memtables.
1908+
if (newest_timestamp->empty() && sv->current->GetSstFilesSize() != 0) {
1909+
// full_history_ts_low is used to track the exclusive upperbound of
1910+
// flushed user defined timestamp. So we can use it to deduce the newest
1911+
// timestamp in the SST files that the column family has seen.
1912+
Slice full_history_ts_low = sv->full_history_ts_low;
1913+
if (!full_history_ts_low.empty()) {
1914+
GetU64CutoffTsFromFullHistoryTsLow(&full_history_ts_low,
1915+
newest_timestamp);
1916+
}
1917+
}
1918+
ReturnAndCleanupSuperVersion(cfd, sv);
1919+
return status;
1920+
}
1921+
18581922
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
18591923
Arena* arena,
18601924
SequenceNumber sequence,

db/db_impl/db_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,9 @@ class DBImpl : public DB {
506506
Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
507507
std::string* ts_low) override;
508508

509+
Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
510+
std::string* newest_timestamp) override;
511+
509512
Status GetDbIdentity(std::string& identity) const override;
510513

511514
virtual Status GetDbIdentityFromIdentityFile(const IOOptions& opts,

db/db_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3446,6 +3446,11 @@ class ModelDB : public DB {
34463446
return Status::OK();
34473447
}
34483448

3449+
Status GetNewestUserDefinedTimestamp(
3450+
ColumnFamilyHandle* /*cf*/, std::string* /*newest_timestamp*/) override {
3451+
return Status::OK();
3452+
}
3453+
34493454
ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; }
34503455

34513456
private:

db/db_with_timestamp_basic_test.cc

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4914,6 +4914,117 @@ TEST_F(DBBasicTestWithTimestamp, TimestampFilterTableReadOnGet) {
49144914
Close();
49154915
}
49164916

4917+
class GetNewestUserDefinedTimestampTest : public DBBasicTestWithTimestampBase {
4918+
public:
4919+
explicit GetNewestUserDefinedTimestampTest()
4920+
: DBBasicTestWithTimestampBase("get_newest_udt_test") {}
4921+
};
4922+
4923+
TEST_F(GetNewestUserDefinedTimestampTest, Basic) {
4924+
std::string newest_timestamp;
4925+
// UDT disabled, get InvalidArgument.
4926+
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
4927+
.IsInvalidArgument());
4928+
Options options = CurrentOptions();
4929+
options.env = env_;
4930+
options.create_if_missing = true;
4931+
options.max_write_buffer_number = 5;
4932+
options.min_write_buffer_number_to_merge = 4;
4933+
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
4934+
4935+
DestroyAndReopen(options);
4936+
// UDT persisted, get NotSupported.
4937+
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
4938+
.IsNotSupported());
4939+
4940+
options.persist_user_defined_timestamps = false;
4941+
options.allow_concurrent_memtable_write = false;
4942+
4943+
DestroyAndReopen(options);
4944+
ASSERT_TRUE(
4945+
db_->GetNewestUserDefinedTimestamp(nullptr, nullptr).IsInvalidArgument());
4946+
4947+
ColumnFamilyHandleImpl* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
4948+
db_->DefaultColumnFamily());
4949+
ColumnFamilyData* cfd = cfh->cfd();
4950+
// The column family hasn't seen any user defined timestamp
4951+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4952+
ASSERT_TRUE(newest_timestamp.empty());
4953+
4954+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(1), "val1"));
4955+
// Testing get newest timestamp from mutable memtable.
4956+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4957+
ASSERT_EQ(EncodeAsUint64(1), newest_timestamp);
4958+
4959+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(2), "val2"));
4960+
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfd));
4961+
// Testing get the newest timestamp from immutable memtable because the
4962+
// mutable one is empty.
4963+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4964+
ASSERT_EQ(EncodeAsUint64(2), newest_timestamp);
4965+
4966+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(3), "val3"));
4967+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(4), "val4"));
4968+
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfd));
4969+
// Testing get the newest timestamp from the more recent immutable memtable
4970+
// when there are multiple immutable memtables.
4971+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4972+
ASSERT_EQ(EncodeAsUint64(4), newest_timestamp);
4973+
4974+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(5), "val5"));
4975+
// Testing get newest timestamp from mutable memtable when it has data, in the
4976+
// presence of immutable memtables.
4977+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4978+
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);
4979+
4980+
ASSERT_OK(Flush());
4981+
// After flushing and all the user defined timestamp are flushed. User defined
4982+
// timestamp info for SST files is available from MANIFEST.
4983+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4984+
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);
4985+
4986+
Reopen(options);
4987+
// Similar after flush, when there is no memtables, but some SST files,
4988+
// if MANIFEST records the upperbound of flushed timestamps because timestamps
4989+
// are not persisted in SST files, this info can be found.
4990+
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
4991+
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);
4992+
4993+
Close();
4994+
}
4995+
4996+
TEST_F(GetNewestUserDefinedTimestampTest, ConcurrentWrites) {
4997+
Options options = CurrentOptions();
4998+
options.create_if_missing = true;
4999+
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
5000+
options.persist_user_defined_timestamps = false;
5001+
options.allow_concurrent_memtable_write = false;
5002+
5003+
DestroyAndReopen(options);
5004+
5005+
std::vector<std::thread> threads;
5006+
threads.reserve(10);
5007+
std::atomic<uint64_t> current_ts{0};
5008+
for (int i = 0; i < 10; i++) {
5009+
threads.emplace_back([this, i, &current_ts]() {
5010+
if (i % 2 == 0) {
5011+
std::string newest_timestamp;
5012+
ASSERT_OK(
5013+
db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
5014+
} else {
5015+
uint64_t write_ts = current_ts.fetch_add(1);
5016+
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(write_ts),
5017+
"val" + std::to_string(i)));
5018+
}
5019+
});
5020+
}
5021+
5022+
for (auto& t : threads) {
5023+
t.join();
5024+
}
5025+
Close();
5026+
}
5027+
49175028
} // namespace ROCKSDB_NAMESPACE
49185029

49195030
int main(int argc, char** argv) {

db/memtable.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
147147
const Comparator* ucmp = cmp.user_comparator();
148148
assert(ucmp);
149149
ts_sz_ = ucmp->timestamp_size();
150-
persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps;
151150
}
152151

153152
MemTable::~MemTable() {
@@ -1806,7 +1805,7 @@ uint64_t MemTable::GetMinLogContainingPrepSection() {
18061805
}
18071806

18081807
void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
1809-
if (ts_sz_ == 0 || persist_user_defined_timestamps_) {
1808+
if (ts_sz_ == 0) {
18101809
return;
18111810
}
18121811
const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
@@ -1817,9 +1816,7 @@ void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
18171816
}
18181817

18191818
const Slice& MemTable::GetNewestUDT() const {
1820-
// This path should not be invoked for MemTables that does not enable the UDT
1821-
// in Memtable only feature.
1822-
assert(ts_sz_ > 0 && !persist_user_defined_timestamps_);
1819+
assert(ts_sz_ > 0);
18231820
return newest_udt_;
18241821
}
18251822

db/memtable.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,8 @@ class MemTable final : public ReadOnlyMemTable {
825825
is_range_del_table_empty_;
826826
}
827827

828+
// Gets the newest user defined timestamps in the memtable. This should only
829+
// be called when user defined timestamp is enabled.
828830
const Slice& GetNewestUDT() const override;
829831

830832
// Returns Corruption status if verification fails.
@@ -900,14 +902,10 @@ class MemTable final : public ReadOnlyMemTable {
900902
// Size in bytes for the user-defined timestamps.
901903
size_t ts_sz_;
902904

903-
// Whether to persist user-defined timestamps
904-
bool persist_user_defined_timestamps_;
905-
906905
// Newest user-defined timestamp contained in this MemTable. For ts1, and ts2
907906
// if Comparator::CompareTimestamp(ts1, ts2) > 0, ts1 is considered newer than
908907
// ts2. We track this field for a MemTable if its column family has UDT
909-
// feature enabled and the `persist_user_defined_timestamp` flag is false.
910-
// Otherwise, this field just contains an empty Slice.
908+
// feature enabled.
911909
Slice newest_udt_;
912910

913911
// Updates flush_state_ using ShouldFlushNow()

db/memtable_list.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,19 @@ bool MemTableListVersion::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
374374
return ret;
375375
}
376376

377+
const Slice& MemTableListVersion::GetNewestUDT() const {
378+
static Slice kEmptySlice;
379+
for (auto it = memlist_.begin(); it != memlist_.end(); ++it) {
380+
ReadOnlyMemTable* m = *it;
381+
Slice timestamp = m->GetNewestUDT();
382+
assert(!timestamp.empty() || m->IsEmpty());
383+
if (!timestamp.empty()) {
384+
return m->GetNewestUDT();
385+
}
386+
}
387+
return kEmptySlice;
388+
}
389+
377390
// Returns true if there is at least one memtable on which flush has
378391
// not yet started.
379392
bool MemTableList::IsFlushPending() const {

db/memtable_list.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ class MemTableListVersion {
149149

150150
int NumFlushed() const { return static_cast<int>(memlist_history_.size()); }
151151

152+
// Gets the newest user defined timestamps from the immutable memtables.
153+
// This returns the newest user defined timestamp found in the most recent
154+
// immutable memtable. This should only be called when user defined timestamp
155+
// is enabled.
156+
const Slice& GetNewestUDT() const;
157+
152158
private:
153159
friend class MemTableList;
154160

include/rocksdb/db.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,6 +1798,25 @@ class DB {
17981798
virtual Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
17991799
std::string* ts_low) = 0;
18001800

1801+
// EXPERIMENTAL
1802+
// Get the newest timestamp of the column family. This is only for when the
1803+
// column family enables user defined timestamp and when timestamps are not
1804+
// persisted in SST files, a.k.a `persist_user_defined_timestamps=false`.
1805+
// This checks the mutable memtable, the immutable memtable and the SST files,
1806+
// and returns the first newest user defined timestamp found.
1807+
// When user defined timestamp is not persisted in SST files, metadata in
1808+
// MANIFEST tracks the most recently seen timestamp for SST files, so the
1809+
// newest timestamp in SST files can be found.
1810+
// OK status is returned if finding the newest timestamp succeeds, if
1811+
// `newest_timestamp` is empty, it means the column family hasn't seen any
1812+
// timestamp. The returned timestamp is encoded, util method `DecodeU64Ts` can
1813+
// be used to decode it into uint64_t.
1814+
// User-defined timestamp is required to be increasing per key, the return
1815+
// value of this API would be most useful if the user-defined timestamp is
1816+
// monotonically increasing across keys.
1817+
virtual Status GetNewestUserDefinedTimestamp(
1818+
ColumnFamilyHandle* column_family, std::string* newest_timestamp) = 0;
1819+
18011820
// Suspend deleting obsolete files. Compactions will continue to occur,
18021821
// but no obsolete files will be deleted. To resume file deletions, each
18031822
// call to DisableFileDeletions() must be matched by a subsequent call to

include/rocksdb/utilities/stackable_db.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,11 @@ class StackableDB : public DB {
512512
return db_->GetFullHistoryTsLow(column_family, ts_low);
513513
}
514514

515+
Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
516+
std::string* newest_timestamp) override {
517+
return db_->GetNewestUserDefinedTimestamp(column_family, newest_timestamp);
518+
}
519+
515520
Status GetSortedWalFiles(VectorWalPtr& files) override {
516521
return db_->GetSortedWalFiles(files);
517522
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
A new API DB::GetNewestUserDefinedTimestamp is added to return the newest user defined timestamp seen in a column family

util/udt_util.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,20 @@ void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
429429
PutFixed64(full_history_ts_low, cutoff_udt_ts + 1);
430430
}
431431

432+
void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
433+
std::string* cutoff_ts) {
434+
uint64_t full_history_ts_low_int = 0;
435+
[[maybe_unused]] bool format_res =
436+
GetFixed64(full_history_ts_low, &full_history_ts_low_int);
437+
assert(format_res);
438+
assert(full_history_ts_low_int > 0);
439+
if (full_history_ts_low_int > 0) {
440+
PutFixed64(cutoff_ts, full_history_ts_low_int - 1);
441+
} else {
442+
PutFixed64(cutoff_ts, 0);
443+
}
444+
}
445+
432446
std::tuple<OptSlice, OptSlice> MaybeAddTimestampsToRange(
433447
const OptSlice& start, const OptSlice& end, size_t ts_sz,
434448
std::string* start_with_ts, std::string* end_with_ts, bool exclusive_end) {

util/udt_util.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ Status ValidateUserDefinedTimestampsOptions(
275275
void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
276276
std::string* full_history_ts_low);
277277

278+
// The reverse of `GetFullHistoryTsLowFromU64CutoffTs`.
279+
void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
280+
std::string* cutoff_ts);
281+
278282
// `start` is the inclusive lower user key bound without user-defined timestamp.
279283
// `end` is the upper user key bound without user-defined timestamp.
280284
// By default, `end` is treated as being exclusive. If `exclusive_end` is set to

0 commit comments

Comments
 (0)