Skip to content

Commit 701698e

Browse files
committed
resend terminal disconnect
Summary: - if we are in the process of connecting when user invokes async_disconnect we should send a disconnect packet after connect completes - move integration disconnect tests to integration folder Reviewers: ivica Reviewed By: ivica Subscribers: korina Differential Revision: https://repo.mireo.local/D28119
1 parent 2f055e8 commit 701698e

File tree

4 files changed

+319
-236
lines changed

4 files changed

+319
-236
lines changed

include/async_mqtt5/impl/disconnect_op.hpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,17 @@ class disconnect_op {
9898
_svc_ptr->async_send(
9999
wire_data,
100100
no_serial, send_flag::terminal,
101-
asio::consign(
102-
asio::prepend(std::move(*this), on_disconnect {}),
103-
std::move(disconnect)
101+
asio::prepend(
102+
std::move(*this),
103+
on_disconnect {}, std::move(disconnect)
104104
)
105105
);
106106
}
107107

108-
void operator()(on_disconnect, error_code ec) {
108+
void operator()(
109+
on_disconnect,
110+
control_packet<allocator_type> disconnect, error_code ec
111+
) {
109112
// The connection must be closed even
110113
// if we failed to send the DISCONNECT packet
111114
// with Reason Code of 0x80 or greater.
@@ -116,13 +119,16 @@ class disconnect_op {
116119
)
117120
return complete(asio::error::operation_aborted);
118121

119-
if (_context.terminal) {
120-
_svc_ptr->cancel();
122+
if (ec == asio::error::try_again) {
123+
if (_context.terminal)
124+
return send_disconnect(std::move(disconnect));
121125
return complete(error_code {});
122126
}
123127

124-
if (ec == asio::error::try_again)
128+
if (_context.terminal) {
129+
_svc_ptr->cancel();
125130
return complete(error_code {});
131+
}
126132

127133
_svc_ptr->close_stream();
128134
_svc_ptr->open_stream();

include/async_mqtt5/mqtt_client.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -872,8 +872,8 @@ class mqtt_client {
872872
*
873873
* \par Completion condition
874874
* The asynchronous operation will complete when one of the following conditions is true:\n
875-
* - The Client has attempted to send a \__DISCONNECT\__ packet, regardless of whether
876-
* the sending was successful or not.\n
875+
* - The Client has sent a \__DISCONNECT\__ packet.\n
876+
* - 5 seconds have elapsed without a successful send.\n
877877
* - An error occurred. This is indicated by an associated \__ERROR_CODE\__ in the handler.\n
878878
*
879879
* \par Error codes

test/integration/disconnect.cpp

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
#include <boost/test/unit_test.hpp>
2+
3+
#include <boost/asio/detached.hpp>
4+
#include <boost/asio/io_context.hpp>
5+
#include <boost/asio/steady_timer.hpp>
6+
7+
#include <async_mqtt5/mqtt_client.hpp>
8+
9+
#include "test_common/message_exchange.hpp"
10+
#include "test_common/test_broker.hpp"
11+
#include "test_common/test_stream.hpp"
12+
13+
using namespace async_mqtt5;
14+
15+
BOOST_AUTO_TEST_SUITE(disconnect/*, *boost::unit_test::disabled()*/)
16+
17+
struct shared_test_data {
18+
error_code success {};
19+
error_code fail = asio::error::not_connected;
20+
21+
const std::string connect = encoders::encode_connect(
22+
"", std::nullopt, std::nullopt, 60, false, {}, std::nullopt
23+
);
24+
const std::string connack = encoders::encode_connack(
25+
true, reason_codes::success.value(), {}
26+
);
27+
const std::string disconnect = encoders::encode_disconnect(
28+
reason_codes::normal_disconnection.value(), disconnect_props {}
29+
);
30+
};
31+
32+
using test::after;
33+
using namespace std::chrono_literals;
34+
using client_type = mqtt_client<test::test_stream>;
35+
36+
template <typename TestCase>
37+
void run_test(test::msg_exchange broker_side, TestCase&& test_case) {
38+
asio::io_context ioc;
39+
auto executor = ioc.get_executor();
40+
auto& broker = asio::make_service<test::test_broker>(
41+
ioc, executor, std::move(broker_side)
42+
);
43+
44+
asio::steady_timer timer(executor);
45+
client_type c(executor);
46+
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
47+
.async_run(asio::detached);
48+
49+
test_case(c, timer);
50+
51+
ioc.run();
52+
BOOST_TEST(broker.received_all_expected());
53+
}
54+
55+
BOOST_FIXTURE_TEST_CASE(successful_disconnect, shared_test_data) {
56+
constexpr int expected_handlers_called = 1;
57+
int handlers_called = 0;
58+
59+
test::msg_exchange broker_side;
60+
broker_side
61+
.expect(connect)
62+
.complete_with(success, after(0ms))
63+
.reply_with(connack, after(0ms))
64+
.expect(disconnect)
65+
.complete_with(success, after(0ms));
66+
67+
run_test(
68+
std::move(broker_side),
69+
[&](client_type& c, asio::steady_timer& timer) {
70+
timer.expires_after(100ms);
71+
timer.async_wait([&](error_code) {
72+
c.async_disconnect(
73+
[&](error_code ec) {
74+
handlers_called++;
75+
BOOST_TEST(!ec);
76+
}
77+
);
78+
});
79+
}
80+
);
81+
82+
BOOST_TEST(handlers_called == expected_handlers_called);
83+
}
84+
85+
BOOST_FIXTURE_TEST_CASE(successful_disconnect_in_queue, shared_test_data) {
86+
constexpr int expected_handlers_called = 2;
87+
int handlers_called = 0;
88+
89+
// packets
90+
auto publish_qos0 = encoders::encode_publish(
91+
0, "topic", "payload", qos_e::at_most_once, retain_e::no, dup_e::no, {}
92+
);
93+
94+
test::msg_exchange broker_side;
95+
broker_side
96+
.expect(connect)
97+
.complete_with(success, after(1ms))
98+
.reply_with(connack, after(2ms))
99+
.expect(publish_qos0)
100+
.complete_with(success, after(1ms))
101+
.expect(disconnect)
102+
.complete_with(success, after(0ms));
103+
104+
run_test(
105+
std::move(broker_side),
106+
[&](client_type& c, asio::steady_timer& timer) {
107+
timer.expires_after(50ms);
108+
timer.async_wait([&](error_code) {
109+
c.async_publish<qos_e::at_most_once>(
110+
"topic", "payload", retain_e::no, {},
111+
[&handlers_called](error_code ec) {
112+
BOOST_TEST(handlers_called == 0);
113+
handlers_called++;
114+
BOOST_TEST(!ec);
115+
}
116+
);
117+
c.async_disconnect(
118+
[&](error_code ec) {
119+
BOOST_TEST(handlers_called == 1);
120+
handlers_called++;
121+
BOOST_TEST(!ec);
122+
}
123+
);
124+
});
125+
}
126+
);
127+
128+
BOOST_TEST(handlers_called == expected_handlers_called);
129+
}
130+
131+
BOOST_FIXTURE_TEST_CASE(disconnect_on_disconnected_client, shared_test_data) {
132+
constexpr int expected_handlers_called = 1;
133+
int handlers_called = 0;
134+
135+
test::msg_exchange broker_side;
136+
broker_side
137+
.expect(connect)
138+
.expect(connect);
139+
140+
run_test(
141+
std::move(broker_side),
142+
[&](client_type& c, asio::steady_timer& timer) {
143+
timer.expires_after(50ms);
144+
timer.async_wait([&](error_code) {
145+
c.async_disconnect(
146+
[&](error_code ec) {
147+
handlers_called++;
148+
BOOST_TEST(ec == asio::error::operation_aborted);
149+
}
150+
);
151+
});
152+
}
153+
);
154+
155+
BOOST_TEST(handlers_called == expected_handlers_called);
156+
}
157+
158+
BOOST_FIXTURE_TEST_CASE(disconnect_in_queue_on_disconnected_client, shared_test_data) {
159+
constexpr int expected_handlers_called = 2;
160+
int handlers_called = 0;
161+
162+
test::msg_exchange broker_side;
163+
broker_side
164+
.expect(connect)
165+
.expect(connect);
166+
167+
run_test(
168+
std::move(broker_side),
169+
[&](client_type& c, asio::steady_timer& timer) {
170+
timer.expires_after(50ms);
171+
timer.async_wait([&](error_code) {
172+
c.async_publish<qos_e::at_most_once>(
173+
"topic", "payload", retain_e::no, {},
174+
[&handlers_called](error_code ec) {
175+
BOOST_TEST(handlers_called == 1);
176+
handlers_called++;
177+
BOOST_TEST(ec == asio::error::operation_aborted);
178+
}
179+
);
180+
c.async_disconnect(
181+
[&](error_code ec) {
182+
BOOST_TEST(handlers_called == 0);
183+
handlers_called++;
184+
BOOST_TEST(ec == asio::error::operation_aborted);
185+
}
186+
);
187+
});
188+
}
189+
);
190+
191+
BOOST_TEST(handlers_called == expected_handlers_called);
192+
}
193+
194+
BOOST_FIXTURE_TEST_CASE(resend_terminal_disconnect, shared_test_data) {
195+
constexpr int expected_handlers_called = 1;
196+
int handlers_called = 0;
197+
198+
test::msg_exchange broker_side;
199+
broker_side
200+
.expect(connect)
201+
.complete_with(success, after(0ms))
202+
.reply_with(connack, after(0ms))
203+
.expect(disconnect)
204+
.complete_with(success, after(0ms));
205+
206+
run_test(
207+
std::move(broker_side),
208+
[&](client_type& c, asio::steady_timer&) {
209+
c.async_disconnect(
210+
[&](error_code ec) {
211+
handlers_called++;
212+
BOOST_TEST(!ec);
213+
}
214+
);
215+
}
216+
);
217+
218+
BOOST_TEST(handlers_called == expected_handlers_called);
219+
}
220+
221+
BOOST_FIXTURE_TEST_CASE(dont_resend_non_terminal_disconnect, shared_test_data) {
222+
auto malformed_publish_1 = encoders::encode_publish(
223+
1, "malformed topic", "malformed payload",
224+
static_cast<qos_e>(0b11), retain_e::yes, dup_e::no, {}
225+
);
226+
auto malformed_publish_2 = encoders::encode_publish(
227+
2, "malformed topic", "malformed payload",
228+
static_cast<qos_e>(0b11), retain_e::yes, dup_e::no, {}
229+
);
230+
231+
auto disconnect_malformed_publish = encoders::encode_disconnect(
232+
reason_codes::malformed_packet.value(),
233+
test::dprops_with_reason_string("Malformed PUBLISH received: QoS bits set to 0b11")
234+
);
235+
236+
test::msg_exchange broker_side;
237+
broker_side
238+
.expect(connect)
239+
.complete_with(success, after(0ms))
240+
.reply_with(connack, after(0ms))
241+
.send(malformed_publish_1, malformed_publish_2, after(10ms))
242+
.expect(disconnect_malformed_publish)
243+
.complete_with(success, after(0ms))
244+
.expect(connect)
245+
.complete_with(success, after(0ms))
246+
.reply_with(connack, after(0ms));
247+
248+
run_test(
249+
std::move(broker_side),
250+
[&](client_type& c, asio::steady_timer& timer) {
251+
timer.expires_after(50ms);
252+
timer.async_wait([&](error_code) {
253+
c.cancel();
254+
});
255+
}
256+
);
257+
}
258+
259+
BOOST_FIXTURE_TEST_CASE(omit_props, shared_test_data) {
260+
constexpr int expected_handlers_called = 1;
261+
int handlers_called = 0;
262+
263+
connack_props co_props;
264+
co_props[prop::maximum_packet_size] = 20;
265+
266+
// packets
267+
auto connack_with_max_packet = encoders::encode_connack(
268+
false, reason_codes::success.value(), co_props
269+
);
270+
271+
disconnect_props props;
272+
props[prop::reason_string] = std::string(50, 'a');
273+
auto big_disconnect = encoders::encode_disconnect(
274+
reason_codes::normal_disconnection.value(), props
275+
);
276+
277+
test::msg_exchange broker_side;
278+
broker_side
279+
.expect(connect)
280+
.complete_with(success, after(0ms))
281+
.reply_with(connack_with_max_packet, after(0ms))
282+
.expect(disconnect)
283+
.complete_with(success, after(0ms));
284+
285+
run_test(
286+
std::move(broker_side),
287+
[&](client_type& c, asio::steady_timer& timer) {
288+
timer.expires_after(50ms);
289+
timer.async_wait([&](error_code) {
290+
c.async_disconnect(
291+
disconnect_rc_e::normal_disconnection, props,
292+
[&](error_code ec) {
293+
handlers_called++;
294+
BOOST_TEST(!ec);
295+
}
296+
);
297+
});
298+
}
299+
);
300+
301+
BOOST_TEST(handlers_called == expected_handlers_called);
302+
}
303+
304+
BOOST_AUTO_TEST_SUITE_END()

0 commit comments

Comments
 (0)