Skip to content

Commit bb568ba

Browse files
authored
Documented connection_pool execution semantics
Refactored pool internals to remove wait_group Added an external strand thread-safety test close #361
1 parent b7b2061 commit bb568ba

File tree

10 files changed

+207
-241
lines changed

10 files changed

+207
-241
lines changed

include/boost/mysql/connection_pool.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,18 @@ class connection_pool
510510
* \par Handler signature
511511
* The handler signature for this operation is `void(boost::mysql::error_code)`
512512
*
513+
* \par Executor
514+
*
515+
* The final handler is executed using `token`'s associated executor,
516+
* or `this->get_executor()` if the token doesn't have an associated
517+
* executor. The final handler is called as if it was submitted using `asio::post`,
518+
* and is never be called inline from within this function.
519+
*
520+
* If the pool was constructed with thread-safety enabled, intermediate
521+
* completion handlers are executed using an internal strand that wraps `this->get_executor()`.
522+
* Otherwise, intermediate handlers are executed using `this->get_executor()`.
523+
* In any case, the token's associated executor is only used for the final handler.
524+
*
513525
* \par Per-operation cancellation
514526
* This operation supports per-operation cancellation. Cancelling `async_run`
515527
* is equivalent to calling \ref connection_pool::cancel.
@@ -583,6 +595,23 @@ class connection_pool
583595
* The handler signature for this operation is
584596
* `void(boost::mysql::error_code, boost::mysql::pooled_connection)`
585597
*
598+
* \par Executor
599+
*
600+
* If the final handler has an associated immediate executor, and the operation
601+
* completes immediately, the final handler is dispatched to it.
602+
* Otherwise, the final handler is called as if it was submitted using `asio::post`,
603+
* and is never be called inline from within this function.
604+
* Immediate completions can only happen when thread-safety is not enabled.
605+
*
606+
* The final handler is executed using `token`'s associated executor,
607+
* or `this->get_executor()` if the token doesn't have an associated
608+
* executor.
609+
*
610+
* If the pool was constructed with thread-safety enabled, intermediate
611+
* completion handlers are executed using an internal strand that wraps `this->get_executor()`.
612+
* Otherwise, intermediate handlers are executed using
613+
* `token`'s associated executor if it has one, or `this->get_executor()` if it hasn't.
614+
*
586615
* \par Per-operation cancellation
587616
* This operation supports per-operation cancellation.
588617
* Cancelling `async_get_connection` has no observable side effects.

include/boost/mysql/impl/internal/connection_pool/connection_node.hpp

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,24 @@ struct conn_shared_state
5656
// as the result of an async_get_connection op
5757
diagnostics last_connect_diag;
5858

59+
// The number of running connections, to track when they exit
60+
std::size_t num_running_connections{0};
61+
62+
// Timer acting as a condition variable to wait for all connections to exit
63+
asio::basic_waitable_timer<ClockType> conns_finished_cv;
64+
5965
conn_shared_state(asio::any_io_executor ex)
60-
: idle_connections_cv(std::move(ex), (ClockType::time_point::max)())
66+
: idle_connections_cv(ex, (ClockType::time_point::max)()),
67+
conns_finished_cv(std::move(ex), (ClockType::time_point::max)())
68+
{
69+
}
70+
71+
void on_connection_start() { ++num_running_connections; }
72+
73+
void on_connection_finish()
6174
{
75+
if (--num_running_connections == 0u)
76+
conns_finished_cv.expires_at((ClockType::time_point::min)());
6277
}
6378
};
6479

@@ -71,7 +86,7 @@ class basic_connection_node : public intrusive::list_base_hook<>,
7186
using this_type = basic_connection_node<ConnectionType, ClockType>;
7287
using timer_type = asio::basic_waitable_timer<ClockType>;
7388

74-
// Not thread-safe, must be manipulated within the pool's executor
89+
// Not thread-safe
7590
const internal_pool_params* params_;
7691
conn_shared_state<ConnectionType, ClockType>* shared_st_;
7792
ConnectionType conn_;
@@ -123,7 +138,15 @@ class basic_connection_node : public intrusive::list_base_hook<>,
123138
connection_task_op(this_type& node) noexcept : node_(node) {}
124139

125140
template <class Self>
126-
void operator()(Self& self, error_code ec = {})
141+
void operator()(Self& self)
142+
{
143+
// Called when the op starts
144+
node_.shared_st_->on_connection_start();
145+
(*this)(self, error_code());
146+
}
147+
148+
template <class Self>
149+
void operator()(Self& self, error_code ec)
127150
{
128151
// A collection status may be generated by idle_wait actions
129152
auto col_st = last_act_ == next_connection_action::idle_wait
@@ -178,7 +201,10 @@ class basic_connection_node : public intrusive::list_base_hook<>,
178201
self
179202
);
180203
break;
181-
case next_connection_action::none: self.complete(error_code()); break;
204+
case next_connection_action::none:
205+
node_.shared_st_->on_connection_finish();
206+
self.complete(error_code());
207+
break;
182208
default: BOOST_ASSERT(false); // LCOV_EXCL_LINE
183209
}
184210
}
@@ -201,35 +227,36 @@ class basic_connection_node : public intrusive::list_base_hook<>,
201227
{
202228
}
203229

230+
// Not thread-safe
204231
void cancel()
205232
{
206233
sansio_connection_node<this_type>::cancel();
207234
timer_.cancel();
208235
collection_timer_.cancel();
209236
}
210237

211-
// This initiation must be invoked within the pool's executor
238+
// Not thread-safe
212239
template <class CompletionToken>
213240
auto async_run(CompletionToken&& token
214241
) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
215242
{
216243
return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
217244
}
218245

219-
ConnectionType& connection() noexcept { return conn_; }
220-
const ConnectionType& connection() const noexcept { return conn_; }
221-
222-
// Not thread-safe, must be called within the pool's executor
246+
// Not thread-safe
223247
void notify_collectable() { collection_timer_.cancel(); }
224248

225-
// Thread-safe. May be safely be called from any thread.
249+
// Thread-safe
226250
void mark_as_collectable(bool should_reset) noexcept
227251
{
228252
collection_state_.store(
229253
should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
230254
);
231255
}
232256

257+
// Getter, used by pooled_connection
258+
ConnectionType& connection() noexcept { return conn_; }
259+
233260
// Exposed for testing
234261
collection_state get_collection_state() const noexcept { return collection_state_; }
235262
};

include/boost/mysql/impl/internal/connection_pool/connection_pool_impl.hpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
#include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
2121
#include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
22-
#include <boost/mysql/impl/internal/connection_pool/wait_group.hpp>
2322
#include <boost/mysql/impl/internal/coroutine.hpp>
2423

2524
#include <boost/asio/any_completion_handler.hpp>
@@ -31,7 +30,7 @@
3130
#include <boost/asio/cancellation_signal.hpp>
3231
#include <boost/asio/cancellation_type.hpp>
3332
#include <boost/asio/compose.hpp>
34-
#include <boost/asio/deferred.hpp>
33+
#include <boost/asio/detached.hpp>
3534
#include <boost/asio/dispatch.hpp>
3635
#include <boost/asio/error.hpp>
3736
#include <boost/asio/immediate.hpp>
@@ -89,7 +88,6 @@ class basic_pool_impl
8988
state_t state_{state_t::initial};
9089
std::list<node_type> all_conns_;
9190
shared_state_type shared_st_;
92-
wait_group wait_gp_;
9391
timer_type cancel_timer_;
9492
const pipeline_request reset_pipeline_req_{make_reset_pipeline()};
9593

@@ -111,8 +109,9 @@ class basic_pool_impl
111109

112110
void create_connection()
113111
{
112+
// Connection tasks always run in the pool's executor
114113
all_conns_.emplace_back(params_, pool_ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
115-
wait_gp_.run_task(all_conns_.back().async_run(asio::deferred));
114+
all_conns_.back().async_run(asio::bind_executor(pool_ex_, asio::detached));
116115
}
117116

118117
void maybe_create_connection()
@@ -225,7 +224,11 @@ class basic_pool_impl
225224
obj_->shared_st_.idle_connections_cv.expires_at((ClockType::time_point::min)());
226225

227226
// Wait for all connection tasks to exit
228-
BOOST_MYSQL_YIELD(resume_point_, 4, obj_->wait_gp_.async_wait(std::move(self)))
227+
BOOST_MYSQL_YIELD(
228+
resume_point_,
229+
4,
230+
obj_->shared_st_.conns_finished_cv.async_wait(std::move(self))
231+
)
229232

230233
// Done
231234
cancel_slot_.clear();
@@ -419,7 +422,6 @@ class basic_pool_impl
419422
conn_ex_(params.connection_executor ? std::move(params.connection_executor) : original_pool_ex_),
420423
params_(make_internal_pool_params(std::move(params))),
421424
shared_st_(pool_ex_),
422-
wait_gp_(pool_ex_),
423425
cancel_timer_(pool_ex_, (std::chrono::steady_clock::time_point::max)())
424426
{
425427
}

include/boost/mysql/impl/internal/connection_pool/wait_group.hpp

Lines changed: 0 additions & 67 deletions
This file was deleted.

test/Jamfile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ local requirements =
6161
<define>BOOST_ASIO_HAS_DEFAULT_FUNCTION_TEMPLATE_ARGUMENTS=1
6262
<define>BOOST_ALLOW_DEPRECATED_HEADERS=1
6363
# Disable warning C4702: unreachable code, produced by Boost.Asio buffer.hpp
64-
<toolset>msvc:<cxxflags>"/bigobj /wd4702 /permissive-"
64+
# Remove /wd4100 when PFR fixes warnings
65+
<toolset>msvc:<cxxflags>"/bigobj /wd4702 /wd4100 /permissive-"
6566
<toolset>msvc:<define>_SCL_SECURE_NO_WARNINGS=1
6667
<toolset>msvc:<define>_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING
6768
<toolset>msvc:<define>_SILENCE_CXX17_ADAPTOR_TYPEDEFS_DEPRECATION_WARNING
@@ -74,6 +75,10 @@ local requirements =
7475
<toolset>gcc,<thread-sanitizer>norecover:<cxxflags>-Wno-tsan
7576
# gcc-11 emits spurious warnings for valid vector::insert ops
7677
<toolset>gcc-11:<cxxflags>-Wno-stringop-overflow
78+
# TODO: remove when PFR unused warnings are fixed
79+
# https://github.com/boostorg/pfr/pull/187
80+
<toolset>gcc:<cxxflags>-Wno-unused-parameter
81+
<toolset>clang:<cxxflags>-Wno-unused-parameter
7782
<target-os>linux:<define>_XOPEN_SOURCE=600
7883
<target-os>linux:<define>_GNU_SOURCE=1
7984
<target-os>windows:<define>_WIN32_WINNT=0x0601

test/thread_safety/Jamfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ local tests =
1818
connection_pool
1919
connection_pool_two_contexts
2020
connection_pool_coroutines
21+
connection_pool_external_strand
2122
connection_pool_cancel
2223
connection_pool_cancel_get_connection
2324
;

0 commit comments

Comments
 (0)