Skip to content

Add a new GetNewestUserDefinedTimestamp API #13547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
#include "options/cf_options.h"
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "util/udt_util.h"
#ifdef ROCKSDB_JEMALLOC
#include "port/jemalloc_helper.h"
#endif
Expand Down Expand Up @@ -1855,6 +1856,69 @@ Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
return Status::OK();
}

Status DBImpl::GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
std::string* newest_timestamp) {
if (newest_timestamp == nullptr) {
return Status::InvalidArgument("newest_timestamp is nullptr");
}
ColumnFamilyData* cfd = nullptr;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
} else {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
cfd = cfh->cfd();
}
assert(cfd != nullptr && cfd->user_comparator() != nullptr);
if (cfd->user_comparator()->timestamp_size() == 0) {
return Status::InvalidArgument(
"Timestamp is not enabled in this column family");
}
if (cfd->ioptions().persist_user_defined_timestamps) {
return Status::NotSupported(
"GetNewestUserDefinedTimestamp doesn't support the case when user"
"defined timestamps are persisted.");
}

Status status;
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
{
InstrumentedMutexLock l(&mutex_);
bool enter_write_thread = sv->mem == cfd->mem();
WriteThread::Writer w;
// Enter write thread to read the mutable memtable to avoid racing access
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to support concurrent access to newest_udt_ from memtable instead of blocking writes for each call to GetNewestUserDefinedTimestamp()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a more performant direction. Right now, the newest_udt_ is a Slice so concurrent access is not available. For uint64_t timestamp, we can consider using std::atomic<uint64_t>.

Since the internal user for this API currently only need to call this API after DB::Open before any writes start, to get the timestamp to recover from. I think entering write thread to avoid the concurrent issue isn't a bad idea. If we need this API to be more performant, I can revise the implementation. We introduce this API for them because the GetFullHistoryTsLow API doesn't feel generic enough.

// with concurrent writes. No need to enter nonmem_write_thread_ since this
// call only care about memtable writes, not WAL writes.
if (enter_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
WaitForPendingWrites();
}
*newest_timestamp = sv->mem->GetNewestUDT().ToString();
assert(!newest_timestamp->empty() || sv->mem->IsEmpty());
if (enter_write_thread) {
write_thread_.ExitUnbatched(&w);
}
}
// Read from immutable memtables if nothing found in mutable memtable.
if (newest_timestamp->empty()) {
*newest_timestamp = sv->imm->GetNewestUDT().ToString();
}
// Read from SST files if no result can be found in memtables.
if (newest_timestamp->empty() && sv->current->GetSstFilesSize() != 0) {
// full_history_ts_low is used to track the exclusive upperbound of
// flushed user defined timestamp. So we can use it to deduce the newest
// timestamp in the SST files that the column family has seen.
Slice full_history_ts_low = sv->full_history_ts_low;
if (!full_history_ts_low.empty()) {
GetU64CutoffTsFromFullHistoryTsLow(&full_history_ts_low,
newest_timestamp);
}
}
ReturnAndCleanupSuperVersion(cfd, sv);
return status;
}

InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
Arena* arena,
SequenceNumber sequence,
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ class DBImpl : public DB {
Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
std::string* ts_low) override;

Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
std::string* newest_timestamp) override;

Status GetDbIdentity(std::string& identity) const override;

virtual Status GetDbIdentityFromIdentityFile(const IOOptions& opts,
Expand Down
5 changes: 5 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,11 @@ class ModelDB : public DB {
return Status::OK();
}

Status GetNewestUserDefinedTimestamp(
ColumnFamilyHandle* /*cf*/, std::string* /*newest_timestamp*/) override {
return Status::OK();
}

ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; }

private:
Expand Down
111 changes: 111 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4914,6 +4914,117 @@ TEST_F(DBBasicTestWithTimestamp, TimestampFilterTableReadOnGet) {
Close();
}

class GetNewestUserDefinedTimestampTest : public DBBasicTestWithTimestampBase {
public:
explicit GetNewestUserDefinedTimestampTest()
: DBBasicTestWithTimestampBase("get_newest_udt_test") {}
};

TEST_F(GetNewestUserDefinedTimestampTest, Basic) {
std::string newest_timestamp;
// UDT disabled, get InvalidArgument.
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
.IsInvalidArgument());
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.max_write_buffer_number = 5;
options.min_write_buffer_number_to_merge = 4;
options.comparator = test::BytewiseComparatorWithU64TsWrapper();

DestroyAndReopen(options);
// UDT persisted, get NotSupported.
ASSERT_TRUE(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp)
.IsNotSupported());

options.persist_user_defined_timestamps = false;
options.allow_concurrent_memtable_write = false;

DestroyAndReopen(options);
ASSERT_TRUE(
db_->GetNewestUserDefinedTimestamp(nullptr, nullptr).IsInvalidArgument());

ColumnFamilyHandleImpl* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
db_->DefaultColumnFamily());
ColumnFamilyData* cfd = cfh->cfd();
// The column family hasn't seen any user defined timestamp
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_TRUE(newest_timestamp.empty());

ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(1), "val1"));
// Testing get newest timestamp from mutable memtable.
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_EQ(EncodeAsUint64(1), newest_timestamp);

ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(2), "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfd));
// Testing get the newest timestamp from immutable memtable because the
// mutable one is empty.
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_EQ(EncodeAsUint64(2), newest_timestamp);

ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(3), "val3"));
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(4), "val4"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfd));
// Testing get the newest timestamp from the more recent immutable memtable
// when there are multiple immutable memtables.
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_EQ(EncodeAsUint64(4), newest_timestamp);

ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(5), "val5"));
// Testing get newest timestamp from mutable memtable when it has data, in the
// presence of immutable memtables.
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);

ASSERT_OK(Flush());
// After flushing and all the user defined timestamp are flushed. User defined
// timestamp info for SST files is available from MANIFEST.
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);

Reopen(options);
// Similar after flush, when there is no memtables, but some SST files,
// if MANIFEST records the upperbound of flushed timestamps because timestamps
// are not persisted in SST files, this info can be found.
ASSERT_OK(db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
ASSERT_EQ(EncodeAsUint64(5), newest_timestamp);

Close();
}

TEST_F(GetNewestUserDefinedTimestampTest, ConcurrentWrites) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = false;
options.allow_concurrent_memtable_write = false;

DestroyAndReopen(options);

std::vector<std::thread> threads;
threads.reserve(10);
std::atomic<uint64_t> current_ts{0};
for (int i = 0; i < 10; i++) {
threads.emplace_back([this, i, &current_ts]() {
if (i % 2 == 0) {
std::string newest_timestamp;
ASSERT_OK(
db_->GetNewestUserDefinedTimestamp(nullptr, &newest_timestamp));
} else {
uint64_t write_ts = current_ts.fetch_add(1);
ASSERT_OK(db_->Put(WriteOptions(), Key(1), EncodeAsUint64(write_ts),
"val" + std::to_string(i)));
}
});
}

for (auto& t : threads) {
t.join();
}
Close();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
7 changes: 2 additions & 5 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
const Comparator* ucmp = cmp.user_comparator();
assert(ucmp);
ts_sz_ = ucmp->timestamp_size();
persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps;
}

MemTable::~MemTable() {
Expand Down Expand Up @@ -1806,7 +1805,7 @@ uint64_t MemTable::GetMinLogContainingPrepSection() {
}

void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
if (ts_sz_ == 0 || persist_user_defined_timestamps_) {
if (ts_sz_ == 0) {
return;
}
const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
Expand All @@ -1817,9 +1816,7 @@ void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
}

const Slice& MemTable::GetNewestUDT() const {
// This path should not be invoked for MemTables that does not enable the UDT
// in Memtable only feature.
assert(ts_sz_ > 0 && !persist_user_defined_timestamps_);
assert(ts_sz_ > 0);
return newest_udt_;
}

Expand Down
8 changes: 3 additions & 5 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ class MemTable final : public ReadOnlyMemTable {
is_range_del_table_empty_;
}

// Gets the newest user defined timestamps in the memtable. This should only
// be called when user defined timestamp is enabled.
const Slice& GetNewestUDT() const override;

// Returns Corruption status if verification fails.
Expand Down Expand Up @@ -900,14 +902,10 @@ class MemTable final : public ReadOnlyMemTable {
// Size in bytes for the user-defined timestamps.
size_t ts_sz_;

// Whether to persist user-defined timestamps
bool persist_user_defined_timestamps_;

// Newest user-defined timestamp contained in this MemTable. For ts1, and ts2
// if Comparator::CompareTimestamp(ts1, ts2) > 0, ts1 is considered newer than
// ts2. We track this field for a MemTable if its column family has UDT
// feature enabled and the `persist_user_defined_timestamp` flag is false.
// Otherwise, this field just contains an empty Slice.
// feature enabled.
Slice newest_udt_;

// Updates flush_state_ using ShouldFlushNow()
Expand Down
13 changes: 13 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,19 @@ bool MemTableListVersion::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
return ret;
}

const Slice& MemTableListVersion::GetNewestUDT() const {
static Slice kEmptySlice;
for (auto it = memlist_.begin(); it != memlist_.end(); ++it) {
ReadOnlyMemTable* m = *it;
Slice timestamp = m->GetNewestUDT();
assert(!timestamp.empty() || m->IsEmpty());
if (!timestamp.empty()) {
return m->GetNewestUDT();
}
}
return kEmptySlice;
}

// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() const {
Expand Down
6 changes: 6 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ class MemTableListVersion {

int NumFlushed() const { return static_cast<int>(memlist_history_.size()); }

// Gets the newest user defined timestamps from the immutable memtables.
// This returns the newest user defined timestamp found in the most recent
// immutable memtable. This should only be called when user defined timestamp
// is enabled.
const Slice& GetNewestUDT() const;

private:
friend class MemTableList;

Expand Down
19 changes: 19 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,25 @@ class DB {
virtual Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
std::string* ts_low) = 0;

// EXPERIMENTAL
// Get the newest timestamp of the column family. This is only for when the
// column family enables user defined timestamp and when timestamps are not
// persisted in SST files, a.k.a `persist_user_defined_timestamps=false`.
// This checks the mutable memtable, the immutable memtable and the SST files,
// and returns the first newest user defined timestamp found.
// When user defined timestamp is not persisted in SST files, metadata in
// MANIFEST tracks the most recently seen timestamp for SST files, so the
// newest timestamp in SST files can be found.
// OK status is returned if finding the newest timestamp succeeds, if
// `newest_timestamp` is empty, it means the column family hasn't seen any
// timestamp. The returned timestamp is encoded, util method `DecodeU64Ts` can
// be used to decode it into uint64_t.
// User-defined timestamp is required to be increasing per key, the return
// value of this API would be most useful if the user-defined timestamp is
// monotonically increasing across keys.
virtual Status GetNewestUserDefinedTimestamp(
ColumnFamilyHandle* column_family, std::string* newest_timestamp) = 0;

// Suspend deleting obsolete files. Compactions will continue to occur,
// but no obsolete files will be deleted. To resume file deletions, each
// call to DisableFileDeletions() must be matched by a subsequent call to
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/utilities/stackable_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,11 @@ class StackableDB : public DB {
return db_->GetFullHistoryTsLow(column_family, ts_low);
}

Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
std::string* newest_timestamp) override {
return db_->GetNewestUserDefinedTimestamp(column_family, newest_timestamp);
}

Status GetSortedWalFiles(VectorWalPtr& files) override {
return db_->GetSortedWalFiles(files);
}
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/new_features/get_newest_udt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A new API DB::GetNewestUserDefinedTimestamp is added to return the newest user defined timestamp seen in a column family
14 changes: 14 additions & 0 deletions util/udt_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,20 @@ void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
PutFixed64(full_history_ts_low, cutoff_udt_ts + 1);
}

void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
std::string* cutoff_ts) {
uint64_t full_history_ts_low_int = 0;
[[maybe_unused]] bool format_res =
GetFixed64(full_history_ts_low, &full_history_ts_low_int);
assert(format_res);
assert(full_history_ts_low_int > 0);
if (full_history_ts_low_int > 0) {
PutFixed64(cutoff_ts, full_history_ts_low_int - 1);
} else {
PutFixed64(cutoff_ts, 0);
}
}

std::tuple<OptSlice, OptSlice> MaybeAddTimestampsToRange(
const OptSlice& start, const OptSlice& end, size_t ts_sz,
std::string* start_with_ts, std::string* end_with_ts, bool exclusive_end) {
Expand Down
4 changes: 4 additions & 0 deletions util/udt_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ Status ValidateUserDefinedTimestampsOptions(
void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts,
std::string* full_history_ts_low);

// The reverse of `GetFullHistoryTsLowFromU64CutoffTs`.
void GetU64CutoffTsFromFullHistoryTsLow(Slice* full_history_ts_low,
std::string* cutoff_ts);

// `start` is the inclusive lower user key bound without user-defined timestamp.
// `end` is the upper user key bound without user-defined timestamp.
// By default, `end` is treated as being exclusive. If `exclusive_end` is set to
Expand Down
Loading