Skip to content

Commit f5ff375

Browse files
authored
Fixed a race condition in per-operation cancellation in connection operations
Now connection/any_connection async operations are cancelled even when the cancellation signal is emitted after an intermediate operation completes, but before its intermediate handler gets called. Restored a connection_id integration test that required this fix. close #199
1 parent c3f1cbf commit f5ff375

File tree

3 files changed

+160
-30
lines changed

3 files changed

+160
-30
lines changed

include/boost/mysql/detail/engine_impl.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
#include <boost/asio/any_io_executor.hpp>
2020
#include <boost/asio/buffer.hpp>
21+
#include <boost/asio/cancellation_type.hpp>
2122
#include <boost/asio/compose.hpp>
23+
#include <boost/asio/error.hpp>
2224
#include <boost/asio/immediate.hpp>
2325
#include <boost/asio/post.hpp>
2426
#include <boost/assert.hpp>
@@ -35,6 +37,11 @@ inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
3537
return asio::mutable_buffer(buff.data(), buff.size());
3638
}
3739

40+
inline bool has_terminal_cancellation(asio::cancellation_type_t cancel_type)
41+
{
42+
return static_cast<bool>(cancel_type & asio::cancellation_type_t::terminal);
43+
}
44+
3845
template <class EngineStream>
3946
struct run_algo_op
4047
{
@@ -57,6 +64,13 @@ struct run_algo_op
5764

5865
while (true)
5966
{
67+
// If we were cancelled, but the last operation completed successfully,
68+
// set a cancelled error code so the algorithm exits. This might happen
69+
// if a cancellation signal is emitted after an intermediate operation succeeded
70+
// but before the handler was called.
71+
if (!io_ec && has_terminal_cancellation(self.cancelled()))
72+
io_ec = asio::error::operation_aborted;
73+
6074
// Run the op
6175
act = resumable_.resume(io_ec, bytes_transferred);
6276
if (act.is_done())

test/integration/test/any_connection.cpp

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -487,34 +487,33 @@ BOOST_FIXTURE_TEST_CASE(connection_id, any_connection_fixture)
487487

488488
// After a fatal error (where we didn't call async_close), re-establishing the session
489489
// updates the connection id
490-
// TODO: this test requires fixing https://github.com/boostorg/mysql/issues/199
491-
// BOOST_FIXTURE_TEST_CASE(connection_id_after_error, any_connection_fixture)
492-
// {
493-
// // Connect
494-
// connect();
495-
// auto id1 = call_connection_id(conn);
496-
497-
// // Force a fatal error
498-
// results r;
499-
// asio::cancellation_signal sig;
500-
// auto execute_result = conn.async_execute(
501-
// "DO SLEEP(60)",
502-
// r,
503-
// asio::bind_cancellation_slot(sig.slot(), as_netresult)
504-
// );
505-
// sig.emit(asio::cancellation_type_t::terminal);
506-
// std::move(execute_result).validate_error(asio::error::operation_aborted);
507-
508-
// // The id can be obtained even after the fatal error
509-
// BOOST_TEST(conn.connection_id() == boost::optional<std::uint32_t>(id1));
510-
511-
// // Reconnect
512-
// connect();
513-
// auto id2 = call_connection_id(conn);
514-
515-
// // The new id can be obtained
516-
// BOOST_TEST(conn.connection_id() == boost::optional<std::uint32_t>(id2));
517-
// }
490+
BOOST_FIXTURE_TEST_CASE(connection_id_after_error, any_connection_fixture)
491+
{
492+
// Connect
493+
connect();
494+
auto id1 = call_connection_id(conn);
495+
496+
// Force a fatal error
497+
results r;
498+
asio::cancellation_signal sig;
499+
auto execute_result = conn.async_execute(
500+
"DO SLEEP(60)",
501+
r,
502+
asio::bind_cancellation_slot(sig.slot(), as_netresult)
503+
);
504+
sig.emit(asio::cancellation_type_t::terminal);
505+
std::move(execute_result).validate_error(asio::error::operation_aborted);
506+
507+
// The id can be obtained even after the fatal error
508+
BOOST_TEST(conn.connection_id() == boost::optional<std::uint32_t>(id1));
509+
510+
// Reconnect
511+
connect();
512+
auto id2 = call_connection_id(conn);
513+
514+
// The new id can be obtained
515+
BOOST_TEST(conn.connection_id() == boost::optional<std::uint32_t>(id2));
516+
}
518517

519518
// It's safe to obtain the connection id while an operation is in progress
520519
BOOST_FIXTURE_TEST_CASE(connection_id_op_in_progress, any_connection_fixture)

test/unit/test/detail/engine_impl.cpp

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
#include <boost/asio/any_completion_handler.hpp>
1717
#include <boost/asio/any_io_executor.hpp>
1818
#include <boost/asio/async_result.hpp>
19+
#include <boost/asio/bind_cancellation_slot.hpp>
1920
#include <boost/asio/bind_executor.hpp>
2021
#include <boost/asio/buffer.hpp>
22+
#include <boost/asio/cancellation_signal.hpp>
23+
#include <boost/asio/cancellation_type.hpp>
2124
#include <boost/asio/coroutine.hpp>
2225
#include <boost/asio/deferred.hpp>
2326
#include <boost/asio/dispatch.hpp>
@@ -29,6 +32,7 @@
2932
#include <cstddef>
3033
#include <cstdint>
3134
#include <cstring>
35+
#include <ostream>
3236

3337
#include "test_common/create_diagnostics.hpp"
3438
#include "test_common/io_context_fixture.hpp"
@@ -41,6 +45,7 @@ using namespace boost::mysql::test;
4145
using namespace boost::mysql::detail;
4246
namespace asio = boost::asio;
4347
using boost::mysql::error_code;
48+
using boost::test_tools::per_element;
4449

4550
BOOST_AUTO_TEST_SUITE(test_engine_impl)
4651

@@ -230,7 +235,21 @@ using signature_t = netmaker_t::signature;
230235
// A mock for a sans-io algorithm. Can be converted to any_resumable_ref
231236
struct mock_algo
232237
{
233-
using call_args = std::pair<error_code, std::size_t>;
238+
struct call_args
239+
{
240+
error_code ec;
241+
std::size_t bytes_transferred;
242+
243+
bool operator==(const call_args& rhs) const
244+
{
245+
return ec == rhs.ec && bytes_transferred == rhs.bytes_transferred;
246+
}
247+
248+
friend std::ostream& operator<<(std::ostream& os, const call_args& v)
249+
{
250+
return os << "{" << v.ec << ", " << v.bytes_transferred << "}";
251+
}
252+
};
234253

235254
std::size_t current_call{0};
236255
std::vector<next_action> acts;
@@ -239,7 +258,7 @@ struct mock_algo
239258
mock_algo(next_action act) : acts{act} {}
240259
mock_algo(next_action act1, next_action act2) : acts{act1, act2} {}
241260

242-
void check_calls(const std::vector<call_args>& expected) { BOOST_TEST(calls == expected); }
261+
void check_calls(const std::vector<call_args>& expected) { BOOST_TEST(calls == expected, per_element()); }
243262

244263
next_action resume(error_code ec, std::size_t bytes_transferred)
245264
{
@@ -559,4 +578,102 @@ BOOST_AUTO_TEST_CASE(resume_error_successive_calls)
559578
}
560579
}
561580

581+
// Regression tests for https://github.com/boostorg/mysql/issues/199
582+
// If a cancellation signal is emitted after an operation completes successfully
583+
// and before its handler gets called, engine uses the composed op's cancellation
584+
// state and resumes the algorithm with a cancelled error code
585+
BOOST_FIXTURE_TEST_CASE(cancel_during_post, io_context_fixture)
586+
{
587+
// Setup
588+
mock_algo algo(next_action::ssl_handshake());
589+
test_engine eng{ctx.get_executor()};
590+
asio::cancellation_signal sig;
591+
592+
// Start the operation. The mock stream will immediately complete the requested operation
593+
// by calling asio::post.
594+
auto run_result = eng.async_run(
595+
any_resumable_ref(algo),
596+
asio::bind_cancellation_slot(sig.slot(), as_netresult)
597+
);
598+
599+
// At this point, the completion hasn't been called yet, since run() hasn't been called.
600+
// Emit the cancellation signal
601+
sig.emit(asio::cancellation_type_t::terminal);
602+
603+
// Run until completion
604+
std::move(run_result).validate_no_error_nodiag();
605+
606+
// The algorithm was resumed with the relevant error
607+
BOOST_TEST(eng.value.stream().calls.size() == 1u);
608+
BOOST_TEST(eng.value.stream().calls[0].type() == next_action_type::ssl_handshake);
609+
algo.check_calls({
610+
{error_code(), 0u},
611+
{asio::error::operation_aborted, 0u}
612+
});
613+
}
614+
615+
// We do nothing for cancellation types != terminal
616+
BOOST_FIXTURE_TEST_CASE(cancel_during_post_cancel_type_non_terminal, io_context_fixture)
617+
{
618+
// Setup
619+
mock_algo algo(next_action::ssl_handshake());
620+
test_engine eng{ctx.get_executor()};
621+
asio::cancellation_signal sig;
622+
623+
// Start the operation. The mock stream will immediately complete the requested operation
624+
// by calling asio::post.
625+
auto run_result = eng.async_run(
626+
any_resumable_ref(algo),
627+
asio::bind_cancellation_slot(sig.slot(), as_netresult)
628+
);
629+
630+
// At this point, the completion hasn't been called yet, since run() hasn't been called.
631+
// Emit the cancellation signal
632+
sig.emit(asio::cancellation_type_t::total);
633+
634+
// Run until completion
635+
std::move(run_result).validate_no_error_nodiag();
636+
637+
// The cancellation is ignored because algorithms don't support total cancellation
638+
BOOST_TEST(eng.value.stream().calls.size() == 1u);
639+
BOOST_TEST(eng.value.stream().calls[0].type() == next_action_type::ssl_handshake);
640+
algo.check_calls({
641+
{error_code(), 0u},
642+
{error_code(), 0u}
643+
});
644+
}
645+
646+
// If the operation failed with another error code, we don't override it
647+
BOOST_FIXTURE_TEST_CASE(cancel_during_post_other_error, io_context_fixture)
648+
{
649+
// Setup
650+
mock_algo algo(next_action::ssl_handshake());
651+
test_engine eng{
652+
{ctx.get_executor(), asio::error::network_reset}
653+
};
654+
asio::cancellation_signal sig;
655+
656+
// Start the operation. The mock stream will immediately complete the requested operation
657+
// by calling asio::post.
658+
auto run_result = eng.async_run(
659+
any_resumable_ref(algo),
660+
asio::bind_cancellation_slot(sig.slot(), as_netresult)
661+
);
662+
663+
// At this point, the completion hasn't been called yet, since run() hasn't been called.
664+
// Emit the cancellation signal
665+
sig.emit(asio::cancellation_type_t::terminal);
666+
667+
// Run until completion
668+
std::move(run_result).validate_no_error_nodiag();
669+
670+
// The cancellation didn't override the operation's error code
671+
BOOST_TEST(eng.value.stream().calls.size() == 1u);
672+
BOOST_TEST(eng.value.stream().calls[0].type() == next_action_type::ssl_handshake);
673+
algo.check_calls({
674+
{error_code(), 0u},
675+
{asio::error::network_reset, 0u}
676+
});
677+
}
678+
562679
BOOST_AUTO_TEST_SUITE_END()

0 commit comments

Comments
 (0)