Skip to content

Commit

Permalink
Add doc comments for wal and allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Jan 2, 2025
1 parent d3f2d41 commit 4106a97
Show file tree
Hide file tree
Showing 8 changed files with 789 additions and 14 deletions.
157 changes: 157 additions & 0 deletions include/distributed_system/wal_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace distributed_system {
template <class StorageT, class LogOperatorT, class CallbackParamT, class PrivateDataT, class SnapshotT>
class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
public:
// Delare the types from wal_log_operator, we use it to check types and keep ABI compatibility
using log_operator_type = LogOperatorT;
using object_type = wal_object<StorageT, LogOperatorT, CallbackParamT, PrivateDataT>;
using snapshot_type = SnapshotT;
Expand Down Expand Up @@ -50,8 +51,10 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
using meta_type = typename object_type::meta_type;
using meta_result_type = typename object_type::meta_result_type;

// Private data type
using private_data_type = typename object_type::private_data_type;

// Callback types
using callback_load_fn_t = typename object_type::callback_load_fn_t;
using callback_dump_fn_t = typename object_type::callback_dump_fn_t;
using callback_log_action_fn_t = typename object_type::callback_log_action_fn_t;
Expand All @@ -77,8 +80,13 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
using callback_send_subscribe_request_fn_t = std::function<wal_result_code(wal_client&, callback_param_type)>;

struct vtable_type : public object_type::vtable_type {
// Callback when received a snapshot, all data should be replaced by snapshot
callback_on_receive_snapshot_fn_t on_receive_snapshot;

// Callback when received a subscribe response
callback_on_receive_subscribe_response_fn_t on_receive_subscribe_response;

// Callback when we need send a subscribe request
callback_send_subscribe_request_fn_t subscribe_request;
};
using vtable_pointer = typename wal_mt_mode_data_trait<vtable_type, log_operator_type::mt_mode>::strong_ptr;
Expand All @@ -95,6 +103,10 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
UTIL_DESIGN_PATTERN_NOCOPYABLE(wal_client);

using wal_object_ptr_type = typename wal_mt_mode_data_trait<object_type, log_operator_type::mt_mode>::strong_ptr;

/**
* @brief Internal class to protect the access of wal_client's constructor
*/
struct construct_helper {
time_point next_heartbeat;
vtable_pointer vt;
Expand Down Expand Up @@ -228,6 +240,10 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_object_->dump(storage, param);
}

/**
* @brief Assign log storage, this will clear all the old logs in wal_object
* @param args logs
*/
template <class... ArgsT>
void assign_logs(ArgsT&&... args) {
if (!wal_object_) {
Expand All @@ -236,13 +252,21 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
wal_object_->assign_logs(std::forward<ArgsT>(args)...);
}

/**
* @brief Get which logs will be ignored with key less than or equal to the key
* @return The key since what to ignore logs, nullptr means no logs will be ignored
*/
const log_key_type* get_global_log_ingore_key() const noexcept {
if (!wal_object_) {
return nullptr;
}
return wal_object_->get_global_ingore_key();
}

/**
* @brief Set ignore logs with key less than or equal to the given key
* @param key The key since what to ignore logs
*/
template <class ToKey>
void set_global_log_ingore_key(ToKey&& key) {
if (!wal_object_) {
Expand All @@ -252,6 +276,11 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
wal_object_->set_global_ingore_key(std::forward<ToKey>(key));
}

/**
* @brief Find log by specify key
* @param key The key to find
* @return The log pointer if found, nullptr if not found
*/
log_pointer find_log(const log_key_type& key) noexcept {
if (!wal_object_) {
return nullptr;
Expand All @@ -260,6 +289,11 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_object_->find_log(key);
}

/**
* @brief Find log by specify key
* @param key The key to find
* @return The log pointer if found, nullptr if not found
*/
log_const_pointer find_log(const log_key_type& key) const noexcept {
if (!wal_object_) {
return nullptr;
Expand All @@ -268,25 +302,67 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_object_->find_log(key);
}

/**
* @brief Get private data
* @return The private data
*/
inline const private_data_type& get_private_data() const noexcept { return wal_object_->get_private_data(); }

/**
* @brief Get private data
* @return The private data
*/
inline private_data_type& get_private_data() noexcept { return wal_object_->get_private_data(); }

/**
* @brief Get the function to compare logs
* @return The function to compare logs
*/
inline const log_key_compare_type& get_log_key_compare() const noexcept { return wal_object_->get_log_key_compare(); }

/**
* @brief Get the function to compare logs
* @return The function to compare logs
*/
inline log_key_compare_type& get_log_key_compare() noexcept { return wal_object_->get_log_key_compare(); }

/**
* @brief Get the configure of this wal_client
* @return The configure of this wal_client
*/
inline const configure_type& get_configure() const noexcept {
// We can not create wal_object without configure, so it's safe here
return *configure_;
}

/**
* @brief Get the configure of this wal_client
* @return The configure of this wal_client
*/
inline configure_type& get_configure() noexcept {
// We can not create wal_object without configure, so it's safe here
return *configure_;
}

/**
* @brief Get internal wal_object
* @return The wal_object instance
*/
const object_type& get_log_manager() const noexcept { return *wal_object_; }

/**
* @brief Get internal wal_object
* @return The wal_object instance
*/
object_type& get_log_manager() noexcept { return *wal_object_; }

/**
* @brief Tick this wal_client, it will trigger log GC, heartbeat and retry actions.
* @param now The current time point
* @param param The callback parameter
* @param max_event The max event to process
* @return The event count processed
*/
size_t tick(const time_point& now, callback_param_type param, size_t max_event = std::numeric_limits<size_t>::max()) {
size_t ret = 0;
if (0 == max_event) {
Expand All @@ -301,6 +377,7 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
round = max_event / 16;
}

// GC expired logs
size_t res = wal_object_->gc(now, nullptr, round);
if (res > 0) {
has_event = true;
Expand Down Expand Up @@ -331,9 +408,24 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return ret;
}

/**
* @brief Set the next heartbeat time point
* @param t The next heartbeat time point
*/
inline void set_next_heartbeat_timepoint(time_point t) noexcept { next_heartbeat_timepoint_ = t; }

/**
* @brief Get the next heartbeat time point
* @return The next heartbeat time point
*/
inline time_point get_next_heartbeat_timepoint() const noexcept { return next_heartbeat_timepoint_; }

/**
* @brief Receive log from publisher, it's assumed that the logs have not holes
* @param param The callback parameter
* @param log The log to receive
* @return The result code
*/
wal_result_code receive_log(callback_param_type param, log_pointer&& log) {
if (!wal_object_) {
return wal_result_code::kInitlization;
Expand Down Expand Up @@ -361,6 +453,7 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
}
}

// The finished key must not be ignored or replaced.
if (get_last_finished_log_key() && !get_log_key_compare()(*this->get_last_finished_log_key(), log_key)) {
return wal_result_code::kIgnore;
}
Expand All @@ -370,15 +463,34 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_object_->emplace_back(std::move(log), param);
}

/**
* @brief Receive log from publisher, it's assumed that the logs have not holes
* @param param The callback parameter
* @param log The log to receive
* @return The result code
*/
wal_result_code receive_log(callback_param_type param, const log_pointer& log) {
return receive_log(param, log_pointer{log});
}

/**
* @brief Receive log from publisher, it's assumed that the logs have not holes
* @param param The callback parameter
* @param args The args to construct the received log
* @return The result code
*/
template <class... LogCtorArgsT>
wal_result_code receive_log(callback_param_type param, LogCtorArgsT&&... args) {
return receive_log(param, log_operator_type::template make_strong<log_type>(std::forward<LogCtorArgsT>(args)...));
}

/**
* @brief Receive log from publisher, it's assumed that the logs have not holes
* @param param The callback parameter
* @param begin The iterator to the first log
* @param end The iterator to the end of logs
* @return The result code
*/
template <class IteratorT>
size_t receive_logs(callback_param_type param, IteratorT begin, IteratorT end) {
size_t ret = 0;
Expand All @@ -392,6 +504,12 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return ret;
}

/**
* @brief Receive log from publisher, it allow holes in logs and will recalculate the hash code when necessary
* @param param The callback parameter
* @param log The log to receive
* @return The result code
*/
wal_result_code receive_hole_log(callback_param_type param, log_pointer&& log) {
if (!wal_object_) {
return wal_result_code::kInitlization;
Expand Down Expand Up @@ -419,6 +537,7 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
}
}

// The finished key must not be ignored or replaced.
if (!(get_last_finished_log_key() && !get_log_key_compare()(*this->get_last_finished_log_key(), log_key))) {
set_last_finished_log_key(std::move(log_key));
}
Expand All @@ -427,16 +546,35 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_object_->emplace_back(std::move(log), param);
}

/**
* @brief Receive log from publisher, it allow holes in logs and will recalculate the hash code when necessary
* @param param The callback parameter
* @param log The log to receive
* @return The result code
*/
wal_result_code receive_hole_log(callback_param_type param, const log_pointer& log) {
return receive_hole_log(param, log_pointer{log});
}

/**
* @brief Receive log from publisher, it allow holes in logs and will recalculate the hash code when necessary
* @param param The callback parameter
* @param args The args to construct the received log
* @return The result code
*/
template <class... LogCtorArgsT>
wal_result_code receive_hole_log(callback_param_type param, LogCtorArgsT&&... args) {
return receive_hole_log(param,
log_operator_type::template make_strong<log_type>(std::forward<LogCtorArgsT>(args)...));
}

/**
* @brief Receive log from publisher, it allow holes in logs and will recalculate the hash code when necessary
* @param param The callback parameter
* @param begin The iterator to the first log
* @param end The iterator to the end of logs
* @return The result code
*/
template <class IteratorT>
size_t receive_hole_logs(callback_param_type param, IteratorT begin, IteratorT end) {
size_t ret = 0;
Expand All @@ -450,6 +588,12 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return ret;
}

/**
* @brief Receive snapshot from publisher
* @param param The callback parameter
* @param snapshot The snapshot to receive
* @return The result code
*/
wal_result_code receive_snapshot(const snapshot_type& snapshot, callback_param_type param) {
if (vtable_ && vtable_->on_receive_snapshot) {
wal_result_code ret = vtable_->on_receive_snapshot(*this, snapshot, param);
Expand All @@ -462,6 +606,10 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_result_code::kInitlization;
}

/**
* @brief Call this function then receive subscribe response from publisher
* @return The result code
*/
wal_result_code receive_subscribe_response(callback_param_type param) {
if (vtable_ && vtable_->on_receive_subscribe_response) {
return vtable_->on_receive_subscribe_response(*this, param);
Expand All @@ -470,6 +618,11 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
return wal_result_code::kInitlization;
}

/**
* @brief Set the last finished log key
* @note We will ingore any logs with key less than or equal to the last finished log key
* @param args The args to construct the last finished log key
*/
template <class... ArgsT>
void set_last_finished_log_key(ArgsT&&... args) {
if (last_finished_log_key_) {
Expand All @@ -479,6 +632,10 @@ class ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_client {
}
}

/**
* @brief Get the last finished log key
* @return The last finished log key or nullptr if not set
*/
const log_key_type* get_last_finished_log_key() const { return last_finished_log_key_.get(); }

private:
Expand Down
8 changes: 8 additions & 0 deletions include/distributed_system/wal_common_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,17 @@ struct ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_log_action_getter_trait {
#endif
};

/**
* @brief Helper class to switch types between strong_rc_ptr and std::shared_ptr.
* @note We recommend to use strong_rc_ptr in single-thread environment.
*/
template <class T, wal_mt_mode MTMode>
struct ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_mt_mode_data_trait;

/**
* @brief Helper class to switch functions between strong_rc_ptr and std::shared_ptr.
* @note We recommend to use strong_rc_ptr in single-thread environment.
*/
template <wal_mt_mode MTMode>
struct ATFRAMEWORK_UTILS_API_HEAD_ONLY wal_mt_mode_func_trait;

Expand Down
Loading

0 comments on commit 4106a97

Please sign in to comment.