Skip to content

Commit fa397b7

Browse files
authored
Fixed a race condition in per-operation cancellation in variant_stream
close #454
1 parent e988c9a commit fa397b7

File tree

2 files changed

+142
-20
lines changed

2 files changed

+142
-20
lines changed

include/boost/mysql/impl/internal/variant_stream.hpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <boost/asio/any_io_executor.hpp>
2121
#include <boost/asio/associated_immediate_executor.hpp>
22+
#include <boost/asio/cancellation_type.hpp>
2223
#include <boost/asio/compose.hpp>
2324
#include <boost/asio/connect.hpp>
2425
#include <boost/asio/dispatch.hpp>
@@ -116,12 +117,23 @@ class variant_stream_connect_algo
116117
asio::ip::tcp::resolver& resolver() { return *resolv_; }
117118
asio::generic::stream_protocol::socket& socket() { return st_->sock; }
118119

119-
vsconnect_action resume(error_code ec, const asio::ip::tcp::resolver::results_type* resolver_results)
120+
vsconnect_action resume(
121+
error_code ec,
122+
const asio::ip::tcp::resolver::results_type* resolver_results,
123+
asio::cancellation_type_t cancel_state
124+
)
120125
{
121126
// All errors are considered fatal
122127
if (ec)
123128
return ec;
124129

130+
// If we received a terminal cancellation signal, exit with the appropriate error code.
131+
// In composed async operations, if the cancellation arrives after an intermediate operation
132+
// has completed, but before the handler is called, the operation finishes successfully,
133+
// but the cancellation state is set. This check covers this case.
134+
if (!!(cancel_state & asio::cancellation_type_t::terminal))
135+
return error_code(asio::error::operation_aborted);
136+
125137
switch (resume_point_)
126138
{
127139
case 0:
@@ -280,7 +292,8 @@ class variant_stream
280292
// Run until complete
281293
while (true)
282294
{
283-
auto act = algo.resume(ec, &resolver_results);
295+
// The sync algorithm doesn't support cancellation
296+
auto act = algo.resume(ec, &resolver_results, asio::cancellation_type_t::none);
284297
switch (act.type)
285298
{
286299
case vsconnect_action_type::connect: asio::connect(st_.sock, act.data.connect, ec); break;
@@ -333,7 +346,7 @@ class variant_stream
333346
const asio::ip::tcp::resolver::results_type& resolver_results = {}
334347
)
335348
{
336-
auto act = algo_->resume(ec, &resolver_results);
349+
auto act = algo_->resume(ec, &resolver_results, self.cancelled());
337350
switch (act.type)
338351
{
339352
case vsconnect_action_type::connect:

test/unit/test/impl/variant_stream.cpp

Lines changed: 126 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010

1111
#include <boost/mysql/impl/internal/variant_stream.hpp>
1212

13+
#include <boost/asio/cancellation_type.hpp>
1314
#include <boost/asio/error.hpp>
1415
#include <boost/asio/generic/stream_protocol.hpp>
1516
#include <boost/asio/ip/address.hpp>
1617
#include <boost/asio/ip/tcp.hpp>
1718
#include <boost/asio/local/stream_protocol.hpp>
19+
#include <boost/core/span.hpp>
1820
#include <boost/test/tools/detail/per_element_manip.hpp>
1921
#include <boost/test/tools/detail/print_helper.hpp>
2022
#include <boost/test/unit_test.hpp>
@@ -28,6 +30,7 @@
2830
using namespace boost::mysql;
2931
using namespace boost::mysql::test;
3032
namespace asio = boost::asio;
33+
using asio::cancellation_type_t;
3134
using asio::ip::tcp;
3235
using boost::test_tools::per_element;
3336
using detail::vsconnect_action_type;
@@ -68,7 +71,7 @@ struct fixture : io_context_fixture
6871
detail::variant_stream_state st{ctx.get_executor(), nullptr};
6972
any_address addr;
7073

71-
std::array<tcp::endpoint, 2> tcp_endpoints() const
74+
static std::array<tcp::endpoint, 2> tcp_endpoints()
7275
{
7376
return {
7477
{
@@ -77,6 +80,11 @@ struct fixture : io_context_fixture
7780
}
7881
};
7982
}
83+
84+
static tcp::resolver::results_type make_resolver_results(boost::span<const tcp::endpoint> endpoints)
85+
{
86+
return tcp::resolver::results_type::create(endpoints.begin(), endpoints.end(), "my_host", "1234");
87+
}
8088
};
8189

8290
BOOST_FIXTURE_TEST_CASE(tcp_success, fixture)
@@ -86,21 +94,21 @@ BOOST_FIXTURE_TEST_CASE(tcp_success, fixture)
8694
detail::variant_stream_connect_algo algo{st, addr};
8795

8896
// Initiate: we should resolve
89-
auto act = algo.resume(error_code(), nullptr);
97+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
9098
BOOST_TEST(act.type == vsconnect_action_type::resolve);
9199
BOOST_TEST(*act.data.resolve.hostname == "my_host");
92100
BOOST_TEST(*act.data.resolve.service == "1234");
93101

94102
// Resolving done: we should connect
95103
auto endpoints = tcp_endpoints();
96-
auto r = tcp::resolver::results_type::create(endpoints.begin(), endpoints.end(), "my_host", "1234");
97-
act = algo.resume(error_code(), &r);
104+
auto r = make_resolver_results(endpoints);
105+
act = algo.resume(error_code(), &r, cancellation_type_t::none);
98106
BOOST_TEST(act.type == vsconnect_action_type::connect);
99107
BOOST_TEST(act.data.connect == endpoints, per_element());
100108

101109
// Connect done: success
102110
st.sock.open(asio::ip::tcp::v4()); // Simulate a connection - otherwise setting sock options fails
103-
act = algo.resume(error_code(), nullptr);
111+
act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
104112
BOOST_TEST(act.type == vsconnect_action_type::none);
105113
BOOST_TEST(act.data.err == error_code());
106114
}
@@ -112,14 +120,14 @@ BOOST_FIXTURE_TEST_CASE(tcp_error_resolve, fixture)
112120
detail::variant_stream_connect_algo algo{st, addr};
113121

114122
// Initiate: we should resolve
115-
auto act = algo.resume(error_code(), nullptr);
123+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
116124
BOOST_TEST(act.type == vsconnect_action_type::resolve);
117125
BOOST_TEST(*act.data.resolve.hostname == "my_host");
118126
BOOST_TEST(*act.data.resolve.service == "1234");
119127

120128
// Resolving error: done
121129
asio::ip::tcp::resolver::results_type r;
122-
act = algo.resume(asio::error::connection_reset, &r);
130+
act = algo.resume(asio::error::connection_reset, &r, cancellation_type_t::none);
123131
BOOST_TEST(act.type == vsconnect_action_type::none);
124132
BOOST_TEST(act.data.err == asio::error::connection_reset);
125133
}
@@ -131,20 +139,20 @@ BOOST_FIXTURE_TEST_CASE(tcp_error_connect, fixture)
131139
detail::variant_stream_connect_algo algo{st, addr};
132140

133141
// Initiate: we should resolve
134-
auto act = algo.resume(error_code(), nullptr);
142+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
135143
BOOST_TEST(act.type == vsconnect_action_type::resolve);
136144
BOOST_TEST(*act.data.resolve.hostname == "my_host");
137145
BOOST_TEST(*act.data.resolve.service == "1234");
138146

139147
// Resolving done: we should connect
140148
auto endpoints = tcp_endpoints();
141-
auto r = tcp::resolver::results_type::create(endpoints.begin(), endpoints.end(), "my_host", "1234");
142-
act = algo.resume(error_code(), &r);
149+
auto r = make_resolver_results(endpoints);
150+
act = algo.resume(error_code(), &r, cancellation_type_t::none);
143151
BOOST_TEST(act.type == vsconnect_action_type::connect);
144152
BOOST_TEST(act.data.connect == endpoints, per_element());
145153

146154
// Connect failed: done. No socket option is set
147-
act = algo.resume(asio::error::connection_reset, nullptr);
155+
act = algo.resume(asio::error::connection_reset, nullptr, cancellation_type_t::none);
148156
BOOST_TEST(act.type == vsconnect_action_type::none);
149157
BOOST_TEST(act.data.err == asio::error::connection_reset);
150158
}
@@ -158,12 +166,12 @@ BOOST_FIXTURE_TEST_CASE(unix_success, fixture)
158166

159167
// Initiate: we should connect
160168
const asio::local::stream_protocol::endpoint endpoints[]{"/my/path"};
161-
auto act = algo.resume(error_code(), nullptr);
169+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
162170
BOOST_TEST(act.type == vsconnect_action_type::connect);
163171
BOOST_TEST(act.data.connect == endpoints, per_element());
164172

165173
// Connect done: success. No socket option is set
166-
act = algo.resume(error_code(), nullptr);
174+
act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
167175
BOOST_TEST(act.type == vsconnect_action_type::none);
168176
BOOST_TEST(act.data.err == error_code());
169177
}
@@ -176,12 +184,12 @@ BOOST_FIXTURE_TEST_CASE(unix_error_connect, fixture)
176184

177185
// Initiate: we should connect
178186
const asio::local::stream_protocol::endpoint endpoints[]{"/my/path"};
179-
auto act = algo.resume(error_code(), nullptr);
187+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
180188
BOOST_TEST(act.type == vsconnect_action_type::connect);
181189
BOOST_TEST(act.data.connect == endpoints, per_element());
182190

183191
// Connect failed: done. No socket option is set
184-
act = algo.resume(asio::error::network_reset, nullptr);
192+
act = algo.resume(asio::error::network_reset, nullptr, cancellation_type_t::none);
185193
BOOST_TEST(act.type == vsconnect_action_type::none);
186194
BOOST_TEST(act.data.err == asio::error::network_reset);
187195
}
@@ -193,16 +201,117 @@ BOOST_FIXTURE_TEST_CASE(unix_unsupported, fixture)
193201
detail::variant_stream_connect_algo algo{st, addr};
194202

195203
// Initiate: immediate completion
196-
auto act = algo.resume(error_code(), nullptr);
204+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
197205
BOOST_TEST(act.type == vsconnect_action_type::immediate);
198206

199207
// Resuming again yields the error
200-
act = algo.resume(error_code(), nullptr);
208+
act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
201209
BOOST_TEST(act.type == vsconnect_action_type::none);
202210
BOOST_TEST(act.data.err == asio::error::operation_not_supported);
203211
}
204212
#endif
205213

214+
// Cancellation: we use the cancellation state and error on cancellation
215+
// Only relevant in the TCP case, as UNIX connect is a single operation
216+
// If the cancellation state contains the terminal type, we fail
217+
BOOST_FIXTURE_TEST_CASE(cancellation_contains_terminal, fixture)
218+
{
219+
struct
220+
{
221+
const char* name;
222+
cancellation_type_t cancellation_state;
223+
} test_cases[] = {
224+
{"terminal", cancellation_type_t::terminal},
225+
{"all", cancellation_type_t::all },
226+
};
227+
228+
for (const auto& tc : test_cases)
229+
{
230+
BOOST_TEST_CONTEXT(tc.name)
231+
{
232+
// Setup
233+
addr.emplace_host_and_port("my_host", 1234);
234+
detail::variant_stream_connect_algo algo{st, addr};
235+
236+
// Initiate: we should resolve
237+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
238+
BOOST_TEST(act.type == vsconnect_action_type::resolve);
239+
240+
// Resolving finished successfully, but the cancellation state is set
241+
auto endpoints = tcp_endpoints();
242+
auto r = make_resolver_results(endpoints);
243+
act = algo.resume(error_code(), &r, tc.cancellation_state);
244+
BOOST_TEST(act.type == vsconnect_action_type::none);
245+
BOOST_TEST(act.data.err == asio::error::operation_aborted);
246+
}
247+
}
248+
}
249+
250+
// Since we only support terminal cancellation, we ignore other cancellation types
251+
BOOST_FIXTURE_TEST_CASE(cancellation_no_terminal, fixture)
252+
{
253+
struct
254+
{
255+
const char* name;
256+
cancellation_type_t cancellation_state;
257+
} test_cases[] = {
258+
{"partial", cancellation_type_t::partial },
259+
{"total", cancellation_type_t::total },
260+
{"partial+total", cancellation_type_t::partial | cancellation_type_t::total},
261+
{"other", static_cast<cancellation_type_t>(0x80) },
262+
};
263+
264+
for (const auto& tc : test_cases)
265+
{
266+
BOOST_TEST_CONTEXT(tc.name)
267+
{
268+
// Setup
269+
addr.emplace_host_and_port("my_host", 1234);
270+
detail::variant_stream_connect_algo algo{st, addr};
271+
272+
// Initiate: we should resolve
273+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
274+
BOOST_TEST(act.type == vsconnect_action_type::resolve);
275+
BOOST_TEST(*act.data.resolve.hostname == "my_host");
276+
BOOST_TEST(*act.data.resolve.service == "1234");
277+
278+
// Resolving done: we should connect
279+
auto endpoints = tcp_endpoints();
280+
auto r = make_resolver_results(endpoints);
281+
act = algo.resume(error_code(), &r, tc.cancellation_state);
282+
BOOST_TEST(act.type == vsconnect_action_type::connect);
283+
BOOST_TEST(act.data.connect == endpoints, per_element());
284+
285+
// Connect done: success
286+
// Simulate a connection - otherwise setting sock options fails
287+
st.sock.open(asio::ip::tcp::v4());
288+
act = algo.resume(error_code(), nullptr, tc.cancellation_state);
289+
BOOST_TEST(act.type == vsconnect_action_type::none);
290+
BOOST_TEST(act.data.err == error_code());
291+
}
292+
}
293+
}
294+
295+
// If there is an I/O error and the cancellation state is set, the error wins
296+
BOOST_FIXTURE_TEST_CASE(cancellation_error, fixture)
297+
{
298+
// Setup
299+
addr.emplace_host_and_port("my_host", 1234);
300+
detail::variant_stream_connect_algo algo{st, addr};
301+
302+
// Initiate: we should resolve
303+
auto act = algo.resume(error_code(), nullptr, cancellation_type_t::none);
304+
BOOST_TEST(act.type == vsconnect_action_type::resolve);
305+
BOOST_TEST(*act.data.resolve.hostname == "my_host");
306+
BOOST_TEST(*act.data.resolve.service == "1234");
307+
308+
// Resolving error, and the cancellation state is set
309+
asio::ip::tcp::resolver::results_type r;
310+
act = algo.resume(asio::error::connection_reset, &r, cancellation_type_t::terminal);
311+
BOOST_TEST(act.type == vsconnect_action_type::none);
312+
BOOST_TEST(act.data.err == asio::error::connection_reset);
313+
}
314+
206315
BOOST_AUTO_TEST_SUITE_END()
207316

208317
} // namespace

0 commit comments

Comments
 (0)