Skip to content

Commit

Permalink
Add glz::sync (#1595)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenberry authored Feb 5, 2025
1 parent 767ed13 commit 1bedbcf
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 0 deletions.
121 changes: 121 additions & 0 deletions include/glaze/thread/sync.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Glaze Library
// For the license information refer to glaze.hpp

#pragma once

#include <concepts>
#include <mutex>
#include <shared_mutex>
#include <type_traits>
#include <utility>

#include "glaze/util/type_traits.hpp"

// The purpose of glz::sync is to create a thread-safe wrapper around a type
// The only way to access the data is by supplying lambdas to `read` or `write`
// methods, which feed underlying data into the lambda.
// A lock is held for the duration of the call.

// Example:
// struct foo { int x{}; };
// sync<foo> s{};
// s.write([](auto& value) { value.x = 42; });
// s.read([](const auto& value) { std::cout << value.x << '\n'; });

namespace glz
{
template <class T, class Callable>
concept const_callable = std::invocable<Callable, const T&>;

template <class T, class Callable>
concept non_const_callable =
std::invocable<Callable, T&&> || std::invocable<Callable, T&>;

template <class Arg, class Callable>
concept void_return = std::same_as<std::invoke_result_t<Callable, Arg>, void>;

template <class T>
class sync {
T data{};
mutable std::shared_mutex mtx{};

public:
sync() = default;

template <class U>
requires (!is_specialization_v<std::decay_t<U>, sync>)
sync(U&& initial_value) : data(std::forward<U>(initial_value)) {}

sync(const sync& other)
requires(std::copy_constructible<T>)
{
std::shared_lock lock(other.mtx);
data = other.data;
}

sync(sync&& other) noexcept(std::is_nothrow_move_constructible_v<T>)
requires(std::move_constructible<T>)
{
std::unique_lock lock(other.mtx);
data = std::move(other.data);
}

sync& operator=(const sync& other)
requires(std::is_copy_assignable_v<T>)
{
if (this != &other) {
std::scoped_lock lock(mtx, other.mtx);
data = other.data;
}
return *this;
}

sync& operator=(sync&& other) noexcept(std::is_nothrow_move_assignable_v<T>)
requires(std::is_move_assignable_v<T>)
{
if (this != &other) {
std::scoped_lock lock(mtx, other.mtx);
data = std::move(other.data);
}
return *this;
}

T copy() const {
std::shared_lock lock(mtx);
return data;
}

// Read with non-void return value.
template <class Callable>
requires(const_callable<T, Callable> &&
!void_return<const T&, Callable>)
auto read(Callable&& f) const -> std::invoke_result_t<Callable, const T&> {
std::shared_lock lock(mtx);
return std::forward<Callable>(f)(data);
}

// Read with void return.
template <class Callable>
requires(const_callable<T, Callable> && void_return<const T&, Callable>)
void read(Callable&& f) const {
std::shared_lock lock(mtx);
std::forward<Callable>(f)(data);
}

// Write with non-void return value.
template <class Callable>
requires(non_const_callable<T, Callable> && !void_return<T&, Callable>)
auto write(Callable&& f) -> std::invoke_result_t<Callable, T&> {
std::unique_lock lock(mtx);
return std::forward<Callable>(f)(data);
}

// Write with void return.
template <class Callable>
requires(non_const_callable<T, Callable> && void_return<T&, Callable>)
void write(Callable&& f) {
std::unique_lock lock(mtx);
std::forward<Callable>(f)(data);
}
};
}
112 changes: 112 additions & 0 deletions tests/exceptions_test/exceptions_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "glaze/thread/async_string.hpp"
#include "glaze/thread/shared_async_map.hpp"
#include "glaze/thread/shared_async_vector.hpp"
#include "glaze/thread/sync.hpp"
#include "glaze/thread/threadpool.hpp"
#include "ut/ut.hpp"

Expand Down Expand Up @@ -1125,4 +1126,115 @@ suite async_string_tests = [] {
};
};

suite sync_tests = [] {
"non-void read and write operations"_test = []
{
// Initialize with 10.
glz::sync<int> s{10};

// Read with a lambda that returns a value.
auto doubled = s.read([](const int &x) -> int {
return x * 2;
});
expect(doubled == 20);

// Write with a lambda that returns a value.
auto new_value = s.write([](int &x) -> int {
x += 5;
return x;
});
expect(new_value == 15);

// Confirm the new value via a read lambda (void-returning).
s.read([](const int &x) {
expect(x == 15);
});
};

"void read operation"_test = []
{
glz::sync<int> s{20};
bool flag = false;
s.read([&flag](const int &x) {
if (x == 20)
flag = true;
});
expect(flag);
};

"void write operation"_test = []
{
glz::sync<int> s{100};
s.write([](int &x) {
x = 200;
});
s.read([](const int &x) {
expect(x == 200);
});
};

"copy constructor"_test = []
{
glz::sync<int> original{123};
glz::sync<int> copy = original;
copy.read([](const int &x) {
expect(x == 123);
});
};

"move constructor"_test = []
{
glz::sync<std::string> original{"hello"};
glz::sync<std::string> moved = std::move(original);
moved.read([](const std::string &s) {
expect(s == "hello");
});
};

"copy assignment."_test = []
{
glz::sync<int> a{10}, b{20};
a = b; // requires T to be copy-assignable
a.read([](const int &x) {
expect(x == 20);
});
};

"move assignment."_test = []
{
glz::sync<std::string> a{"foo"}, b{"bar"};
a = std::move(b); // requires T to be move-assignable
a.read([](const std::string &s) {
expect(s == "bar");
});
};

"concurrent access."_test = []
{
glz::sync<int> s{0};
const int num_threads = 10;
const int increments = 1000;
std::vector<std::thread> threads;

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&] {
for (int j = 0; j < increments; ++j) {
s.write([](int &value) {
++value;
});
}
});
}

for (auto &th : threads) {
th.join();
}

// Verify that the value is the expected total.
s.read([&](const int &value) {
expect(value == num_threads * increments);
});
};
};

int main() { return 0; }

0 comments on commit 1bedbcf

Please sign in to comment.