Skip to content

Commit 829dbf7

Browse files
authored
Connections now check for in-progress async operations and fail without UB
connection and any_connection now check whether there is an in-progress async operation, and fail with client_errc::operation_in_progress if there is one. This situation no longer triggers undefined behavior. Refactored the internal sans-io algorithms close #405
1 parent 91cd262 commit 829dbf7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+620
-384
lines changed

doc/qbk/04_overview.qbk

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,17 +224,15 @@ for more info.
224224

225225
[section:async Single outstanding async operation per connection]
226226

227-
At any given point in time, a `any_connection` may only have a single async operation outstanding.
228-
Because MySQL sessions are stateful, and to keep the implementation simple, messages
229-
are written to the underlying transport without any locking or queueing.
230-
If you perform several async operations concurrently on a single connection without any
231-
serialization, messages from different operations will be interleaved, leading to undefined behavior.
227+
At any given point in time, an `any_connection` can only have a single async operation outstanding.
228+
In other words, connections implement no asynchronous locking or queueing, which
229+
keeps code simple and efficient. If you need to perform several operations in parallel,
230+
you can open more connections or use [reflink connection_pool].
232231

233-
For example, doing the following is illegal and should be avoided:
232+
Trying to run operations concurrently on a single connection is detected at
233+
runtime and generates a `client_errc::operation_in_progress` error:
234234

235-
[overview_async_dont]
236-
237-
If you need to perform queries in parallel, open more connections to the server.
235+
[overview_async_parallel]
238236

239237
[endsect]
240238

include/boost/mysql/any_connection.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ struct any_connection_params
132132
*
133133
* This is a move-only type.
134134
*
135+
* \par Single outstanding async operation per connection
136+
* At any given point in time, only one async operation can be outstanding
137+
* per connection. If an async operation is initiated while another one is in progress,
138+
* it will fail with \ref client_errc::operation_in_progress.
139+
*
135140
* \par Default completion tokens
136141
* The default completion token for all async operations in this class is
137142
* `with_diagnostics(asio::deferred)`, which allows you to use `co_await`

include/boost/mysql/client_errc.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ enum class client_errc : int
138138
* size. Try increasing \ref any_connection_params::max_buffer_size.
139139
*/
140140
max_buffer_size_exceeded,
141+
142+
/**
143+
* \brief Another operation is currently in progress for this connection. Make sure
144+
* that a single connection does not run two asynchronous operations in parallel.
145+
*/
146+
operation_in_progress,
141147
};
142148

143149
BOOST_MYSQL_DECL

include/boost/mysql/connection.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ class static_execution_state;
5555
* the stream using \ref connection::stream, and its executor via \ref connection::get_executor. The
5656
* executor used by this object is always the same as the underlying stream.
5757
*
58+
* \par Single outstanding async operation per connection
59+
* At any given point in time, only one async operation can be outstanding
60+
* per connection. If an async operation is initiated while another one is in progress,
61+
* it will fail with \ref client_errc::operation_in_progress.
62+
*
5863
* \par Thread safety
5964
* Distinct objects: safe. \n
6065
* Shared objects: unsafe. \n

include/boost/mysql/detail/algo_params.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ struct pipeline_request_stage;
3636

3737
struct connect_algo_params
3838
{
39+
const void* server_address; // Points to an any_address or an endpoint for the corresponding stream. For
40+
// the templated connection, only valid until the first yield!
3941
handshake_params hparams;
4042
bool secure_channel; // Are we using UNIX sockets or any other secure channel?
4143

include/boost/mysql/detail/any_resumable_ref.hpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,31 @@ namespace detail {
1919

2020
class any_resumable_ref
2121
{
22-
template <class T>
23-
static next_action do_resume(void* self, error_code ec, std::size_t bytes_transferred)
24-
{
25-
return static_cast<T*>(self)->resume(ec, bytes_transferred);
26-
}
27-
22+
public:
2823
using fn_t = next_action (*)(void*, error_code, std::size_t);
2924

30-
void* algo_{};
31-
fn_t fn_{};
32-
33-
public:
3425
template <class T, class = typename std::enable_if<!std::is_same<T, any_resumable_ref>::value>::type>
3526
explicit any_resumable_ref(T& op) noexcept : algo_(&op), fn_(&do_resume<T>)
3627
{
3728
}
3829

30+
// Allow using standalone functions
31+
any_resumable_ref(void* algo, fn_t fn) noexcept : algo_(algo), fn_(fn) {}
32+
3933
next_action resume(error_code ec, std::size_t bytes_transferred)
4034
{
4135
return fn_(algo_, ec, bytes_transferred);
4236
}
37+
38+
private:
39+
template <class T>
40+
static next_action do_resume(void* self, error_code ec, std::size_t bytes_transferred)
41+
{
42+
return static_cast<T*>(self)->resume(ec, bytes_transferred);
43+
}
44+
45+
void* algo_{};
46+
fn_t fn_{};
4347
};
4448

4549
} // namespace detail

include/boost/mysql/detail/connection_impl.hpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,15 @@ class connection_impl
223223
};
224224

225225
// Connect
226-
static connect_algo_params make_params_connect(const handshake_params& params)
226+
static connect_algo_params make_params_connect(const void* server_address, const handshake_params& params)
227227
{
228-
return connect_algo_params{params, false};
228+
return connect_algo_params{server_address, params, false};
229229
}
230230

231231
static connect_algo_params make_params_connect_v2(const connect_params& params)
232232
{
233233
return connect_algo_params{
234+
&params.server_address,
234235
make_hparams(params),
235236
params.server_address.type() == address_type::unix_path
236237
};
@@ -251,8 +252,13 @@ class connection_impl
251252
handshake_params params
252253
)
253254
{
254-
eng->set_endpoint(&endpoint);
255-
async_run_impl(*eng, *st, make_params_connect(params), *diag, std::forward<Handler>(handler));
255+
async_run_impl(
256+
*eng,
257+
*st,
258+
make_params_connect(&endpoint, params),
259+
*diag,
260+
std::forward<Handler>(handler)
261+
);
256262
}
257263
};
258264

@@ -269,7 +275,6 @@ class connection_impl
269275
const connect_params* params
270276
)
271277
{
272-
eng->set_endpoint(&params->server_address);
273278
async_run_impl(*eng, *st, make_params_connect_v2(*params), *diag, std::forward<Handler>(handler));
274279
}
275280
};
@@ -388,13 +393,11 @@ class connection_impl
388393
diagnostics& diag
389394
)
390395
{
391-
engine_->set_endpoint(&endpoint);
392-
run(make_params_connect(params), err, diag);
396+
run(make_params_connect(&endpoint, params), err, diag);
393397
}
394398

395399
void connect_v2(const connect_params& params, error_code& err, diagnostics& diag)
396400
{
397-
engine_->set_endpoint(&params.server_address);
398401
run(make_params_connect_v2(params), err, diag);
399402
}
400403

include/boost/mysql/detail/engine.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ class engine
2525
virtual ~engine() {}
2626
virtual executor_type get_executor() = 0;
2727
virtual bool supports_ssl() const = 0;
28-
virtual void set_endpoint(const void* endpoint) = 0;
2928
virtual void run(any_resumable_ref resumable, error_code& err) = 0;
3029
virtual void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)>) = 0;
3130
};

include/boost/mysql/detail/engine_impl.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,11 @@ struct run_algo_op
111111
}
112112
else if (act.type() == next_action_type::connect)
113113
{
114-
BOOST_MYSQL_YIELD(resume_point_, 6, stream_.async_connect(std::move(self)))
114+
BOOST_MYSQL_YIELD(
115+
resume_point_,
116+
6,
117+
stream_.async_connect(act.connect_endpoint(), std::move(self))
118+
)
115119
has_done_io_ = true;
116120
}
117121
else
@@ -128,7 +132,6 @@ struct run_algo_op
128132
// using executor_type = asio::any_io_executor;
129133
// executor_type get_executor();
130134
// bool supports_ssl() const;
131-
// void set_endpoint(const void* endpoint);
132135
// std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);
133136
// void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);
134137
// std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);
@@ -137,8 +140,8 @@ struct run_algo_op
137140
// void async_ssl_handshake(CompletionToken&&);
138141
// void ssl_shutdown(error_code&);
139142
// void async_ssl_shutdown(CompletionToken&&);
140-
// void connect(error_code&);
141-
// void async_connect(CompletionToken&&);
143+
// void connect(const void* server_address, error_code&);
144+
// void async_connect(const void* server_address, CompletionToken&&);
142145
// void close(error_code&);
143146
// Async operations are only required to support callback types
144147
// See stream_adaptor for an implementation
@@ -161,8 +164,6 @@ class engine_impl final : public engine
161164

162165
bool supports_ssl() const override final { return stream_.supports_ssl(); }
163166

164-
void set_endpoint(const void* endpoint) override final { stream_.set_endpoint(endpoint); }
165-
166167
void run(any_resumable_ref resumable, error_code& ec) override final
167168
{
168169
ec.clear();
@@ -207,7 +208,7 @@ class engine_impl final : public engine
207208
}
208209
else if (act.type() == next_action_type::connect)
209210
{
210-
stream_.connect(io_ec);
211+
stream_.connect(act.connect_endpoint(), io_ec);
211212
}
212213
else
213214
{

include/boost/mysql/detail/engine_stream_adaptor.hpp

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,61 +31,48 @@ namespace mysql {
3131
namespace detail {
3232

3333
// Connect and close helpers
34-
template <class Stream, class = void>
35-
struct endpoint_storage // prevent build errors for non socket streams
36-
{
37-
void store(const void*) { BOOST_ASSERT(false); } // LCOV_EXCL_LINE
38-
};
39-
40-
template <class Stream>
41-
struct endpoint_storage<Stream, void_t<typename Stream::lowest_layer_type::endpoint_type>>
42-
{
43-
using endpoint_type = typename Stream::lowest_layer_type::endpoint_type;
44-
endpoint_type value;
45-
void store(const void* v) { value = *static_cast<const endpoint_type*>(v); }
46-
};
47-
4834
// LCOV_EXCL_START
4935
template <class Stream>
50-
void do_connect_impl(Stream&, const endpoint_storage<Stream>&, error_code&, std::false_type)
36+
void do_connect_impl(Stream&, const void*, error_code&, std::false_type)
5137
{
5238
BOOST_ASSERT(false);
5339
}
5440
// LCOV_EXCL_STOP
5541

5642
template <class Stream>
57-
void do_connect_impl(Stream& stream, const endpoint_storage<Stream>& ep, error_code& ec, std::true_type)
43+
void do_connect_impl(Stream& stream, const void* ep, error_code& ec, std::true_type)
5844
{
59-
stream.lowest_layer().connect(ep.value, ec);
45+
stream.lowest_layer().connect(
46+
*static_cast<const typename Stream::lowest_layer_type::endpoint_type*>(ep),
47+
ec
48+
);
6049
}
6150

6251
template <class Stream>
63-
void do_connect(Stream& stream, const endpoint_storage<Stream>& ep, error_code& ec)
52+
void do_connect(Stream& stream, const void* ep, error_code& ec)
6453
{
6554
do_connect_impl(stream, ep, ec, is_socket_stream<Stream>{});
6655
}
6756

6857
// LCOV_EXCL_START
6958
template <class Stream, class CompletionToken>
70-
void do_async_connect_impl(Stream&, const endpoint_storage<Stream>&, CompletionToken&&, std::false_type)
59+
void do_async_connect_impl(Stream&, const void*, CompletionToken&&, std::false_type)
7160
{
7261
BOOST_ASSERT(false);
7362
}
7463
// LCOV_EXCL_STOP
7564

7665
template <class Stream, class CompletionToken>
77-
void do_async_connect_impl(
78-
Stream& stream,
79-
const endpoint_storage<Stream>& ep,
80-
CompletionToken&& token,
81-
std::true_type
82-
)
66+
void do_async_connect_impl(Stream& stream, const void* ep, CompletionToken&& token, std::true_type)
8367
{
84-
stream.lowest_layer().async_connect(ep.value, std::forward<CompletionToken>(token));
68+
stream.lowest_layer().async_connect(
69+
*static_cast<const typename Stream::lowest_layer_type::endpoint_type*>(ep),
70+
std::forward<CompletionToken>(token)
71+
);
8572
}
8673

8774
template <class Stream, class CompletionToken>
88-
void do_async_connect(Stream& stream, const endpoint_storage<Stream>& ep, CompletionToken&& token)
75+
void do_async_connect(Stream& stream, const void* ep, CompletionToken&& token)
8976
{
9077
do_async_connect_impl(stream, ep, std::forward<CompletionToken>(token), is_socket_stream<Stream>{});
9178
}
@@ -115,7 +102,6 @@ template <class Stream>
115102
class engine_stream_adaptor
116103
{
117104
Stream stream_;
118-
endpoint_storage<Stream> endpoint_;
119105

120106
public:
121107
template <class... Args>
@@ -128,8 +114,6 @@ class engine_stream_adaptor
128114

129115
bool supports_ssl() const { return false; }
130116

131-
void set_endpoint(const void* val) { endpoint_.store(val); }
132-
133117
using executor_type = asio::any_io_executor;
134118
executor_type get_executor() { return stream_.get_executor(); }
135119

@@ -185,12 +169,12 @@ class engine_stream_adaptor
185169
}
186170

187171
// Connect and close
188-
void connect(error_code& ec) { do_connect(stream_, endpoint_, ec); }
172+
void connect(const void* endpoint, error_code& ec) { do_connect(stream_, endpoint, ec); }
189173

190174
template <class CompletionToken>
191-
void async_connect(CompletionToken&& token)
175+
void async_connect(const void* endpoint, CompletionToken&& token)
192176
{
193-
do_async_connect(stream_, endpoint_, std::forward<CompletionToken>(token));
177+
do_async_connect(stream_, endpoint, std::forward<CompletionToken>(token));
194178
}
195179

196180
void close(error_code& ec) { do_close(stream_, ec); }
@@ -200,7 +184,6 @@ template <class Stream>
200184
class engine_stream_adaptor<asio::ssl::stream<Stream>>
201185
{
202186
asio::ssl::stream<Stream> stream_;
203-
endpoint_storage<asio::ssl::stream<Stream>> endpoint_;
204187

205188
public:
206189
template <class... Args>
@@ -213,8 +196,6 @@ class engine_stream_adaptor<asio::ssl::stream<Stream>>
213196

214197
bool supports_ssl() const { return true; }
215198

216-
void set_endpoint(const void* val) { endpoint_.store(val); }
217-
218199
using executor_type = asio::any_io_executor;
219200
executor_type get_executor() { return stream_.get_executor(); }
220201

@@ -288,12 +269,12 @@ class engine_stream_adaptor<asio::ssl::stream<Stream>>
288269
}
289270

290271
// Connect and close
291-
void connect(error_code& ec) { do_connect(stream_, endpoint_, ec); }
272+
void connect(const void* endpoint, error_code& ec) { do_connect(stream_, endpoint, ec); }
292273

293274
template <class CompletionToken>
294-
void async_connect(CompletionToken&& token)
275+
void async_connect(const void* endpoint, CompletionToken&& token)
295276
{
296-
do_async_connect(stream_, endpoint_, std::forward<CompletionToken>(token));
277+
do_async_connect(stream_, endpoint, std::forward<CompletionToken>(token));
297278
}
298279

299280
void close(error_code& ec) { do_close(stream_, ec); }

0 commit comments

Comments
 (0)