Skip to content

Commit e25eebf

Browse files
authored
Samples fix (#127)
- add predicate for each conditionVariable.wait() - changed the booleans in basic_sub_pub & raw_sub_pub to atomic - Fixed the onSubAck callback to check the error code and QoS returned.
1 parent c65bf71 commit e25eebf

File tree

7 files changed

+177
-219
lines changed

7 files changed

+177
-219
lines changed

aws-common-runtime/aws-crt-cpp

Submodule aws-crt-cpp updated 79 files

samples/greengrass/basic_discovery/main.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,8 @@ int main(int argc, char *argv[])
190190
Aws::Iot::MqttClient mqttClient(bootstrap);
191191
std::shared_ptr<Mqtt::MqttConnection> connection(nullptr);
192192

193-
std::mutex semaphoreLock;
194-
std::condition_variable semaphore;
195-
std::atomic<bool> connectionFinished(false);
196-
std::atomic<bool> shutdownCompleted(false);
193+
std::promise<void> connectionFinishedPromise;
194+
std::promise<void> shutdownCompletedPromise;
197195

198196
discoveryClient->Discover(thingName, [&](DiscoverResponse *response, int error, int httpResponseCode) {
199197
if (!error && response->GGGroups)
@@ -256,8 +254,7 @@ int main(int argc, char *argv[])
256254
if (!errorCode)
257255
{
258256
fprintf(stdout, "Successfully subscribed to %s\n", topic.c_str());
259-
connectionFinished = true;
260-
semaphore.notify_one();
257+
connectionFinishedPromise.set_value();
261258
}
262259
else
263260
{
@@ -274,8 +271,7 @@ int main(int argc, char *argv[])
274271
}
275272
else
276273
{
277-
connectionFinished = true;
278-
semaphore.notify_one();
274+
connectionFinishedPromise.set_value();
279275
}
280276
}
281277
else
@@ -301,8 +297,7 @@ int main(int argc, char *argv[])
301297

302298
connection->OnDisconnect = [&](Mqtt::MqttConnection & /*connection*/) {
303299
fprintf(stdout, "Connection disconnected. Shutting Down.....\n");
304-
shutdownCompleted = true;
305-
semaphore.notify_one();
300+
shutdownCompletedPromise.set_value();
306301
};
307302

308303
if (!connection->Connect(thingName.c_str(), false))
@@ -323,8 +318,7 @@ int main(int argc, char *argv[])
323318
});
324319

325320
{
326-
std::unique_lock<std::mutex> lock(semaphoreLock);
327-
semaphore.wait(lock, [&]() { return connectionFinished.load(); });
321+
connectionFinishedPromise.get_future().wait();
328322
}
329323

330324
while (true)
@@ -373,8 +367,7 @@ int main(int argc, char *argv[])
373367

374368
connection->Disconnect();
375369
{
376-
std::unique_lock<std::mutex> lock(semaphoreLock);
377-
semaphore.wait(lock, [&]() { return shutdownCompleted.load(); });
370+
shutdownCompletedPromise.get_future().wait();
378371
}
379372

380373
return 0;

samples/identity/fleet_provisioning/main.cpp

Lines changed: 43 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ int main(int argc, char *argv[])
183183

184184
/*
185185
* Since no exceptions are used, always check the bool operator
186-
* when an error could have occured.
186+
* when an error could have occurred.
187187
*/
188188
if (!mqttClient)
189189
{
@@ -206,10 +206,8 @@ int main(int argc, char *argv[])
206206
* In a real world application you probably don't want to enforce synchronous behavior
207207
* but this is a sample console application, so we'll just do that with a condition variable.
208208
*/
209-
std::condition_variable conditionVariable;
210-
std::atomic<bool> connectionSucceeded(false);
211-
std::atomic<bool> connectionClosed(false);
212-
std::atomic<bool> connectionCompleted(false);
209+
std::promise<bool> connectionCompletedPromise;
210+
std::promise<void> connectionClosedPromise;
213211

214212
/*
215213
* This will execute when an mqtt connect has completed or failed.
@@ -218,16 +216,13 @@ int main(int argc, char *argv[])
218216
if (errorCode)
219217
{
220218
fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
221-
connectionSucceeded = false;
219+
connectionCompletedPromise.set_value(false);
222220
}
223221
else
224222
{
225223
fprintf(stdout, "Connection completed with return code %d\n", returnCode);
226-
connectionSucceeded = true;
224+
connectionCompletedPromise.set_value(true);
227225
}
228-
229-
connectionCompleted = true;
230-
conditionVariable.notify_one();
231226
};
232227

233228
/*
@@ -236,9 +231,8 @@ int main(int argc, char *argv[])
236231
auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
237232
{
238233
fprintf(stdout, "Disconnect completed\n");
239-
connectionClosed = true;
234+
connectionClosedPromise.set_value();
240235
}
241-
conditionVariable.notify_one();
242236
};
243237

244238
connection->OnConnectionCompleted = std::move(onConnectionCompleted);
@@ -254,25 +248,21 @@ int main(int argc, char *argv[])
254248
exit(-1);
255249
}
256250

257-
std::mutex mutex;
258-
std::unique_lock<std::mutex> uniqueLock(mutex);
259-
conditionVariable.wait(uniqueLock, [&]() { return connectionSucceeded || connectionClosed; });
260-
261-
if (connectionSucceeded)
251+
if (connectionCompletedPromise.get_future().get())
262252
{
263253
IotIdentityClient identityClient(connection);
264254

265-
std::atomic<bool> csrPublishCompleted(false);
266-
std::atomic<bool> csrAcceptedCompleted(false);
267-
std::atomic<bool> csrRejectedCompleted(false);
255+
std::promise<void> csrPublishCompletedPromise;
256+
std::promise<void> csrAcceptedCompletedPromise;
257+
std::promise<void> csrRejectedCompletedPromise;
268258

269-
std::atomic<bool> keysPublishCompleted(false);
270-
std::atomic<bool> keysAcceptedCompleted(false);
271-
std::atomic<bool> keysRejectedCompleted(false);
259+
std::promise<void> keysPublishCompletedPromise;
260+
std::promise<void> keysAcceptedCompletedPromise;
261+
std::promise<void> keysRejectedCompletedPromise;
272262

273-
std::atomic<bool> registerPublishCompleted(false);
274-
std::atomic<bool> registerAcceptedCompleted(false);
275-
std::atomic<bool> registerRejectedCompleted(false);
263+
std::promise<void> registerPublishCompletedPromise;
264+
std::promise<void> registerAcceptedCompletedPromise;
265+
std::promise<void> registerRejectedCompletedPromise;
276266

277267
auto onCsrPublishSubAck = [&](int ioErr) {
278268
if (ioErr != AWS_OP_SUCCESS)
@@ -281,8 +271,7 @@ int main(int argc, char *argv[])
281271
exit(-1);
282272
}
283273

284-
csrPublishCompleted = true;
285-
conditionVariable.notify_one();
274+
csrPublishCompletedPromise.set_value();
286275
};
287276

288277
auto onCsrAcceptedSubAck = [&](int ioErr) {
@@ -293,18 +282,17 @@ int main(int argc, char *argv[])
293282
exit(-1);
294283
}
295284

296-
csrAcceptedCompleted = true;
297-
conditionVariable.notify_one();
285+
csrAcceptedCompletedPromise.set_value();
298286
};
299287

300288
auto onCsrRejectedSubAck = [&](int ioErr) {
301289
if (ioErr != AWS_OP_SUCCESS)
302290
{
303291
fprintf(
304292
stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr));
293+
exit(-1);
305294
}
306-
csrRejectedCompleted = true;
307-
conditionVariable.notify_one();
295+
csrRejectedCompletedPromise.set_value();
308296
};
309297

310298
auto onCsrAccepted = [&](CreateCertificateFromCsrResponse *response, int ioErr) {
@@ -346,8 +334,7 @@ int main(int argc, char *argv[])
346334
exit(-1);
347335
}
348336

349-
keysPublishCompleted = true;
350-
conditionVariable.notify_one();
337+
keysPublishCompletedPromise.set_value();
351338
};
352339

353340
auto onKeysAcceptedSubAck = [&](int ioErr) {
@@ -358,18 +345,17 @@ int main(int argc, char *argv[])
358345
exit(-1);
359346
}
360347

361-
keysAcceptedCompleted = true;
362-
conditionVariable.notify_one();
348+
keysAcceptedCompletedPromise.set_value();
363349
};
364350

365351
auto onKeysRejectedSubAck = [&](int ioErr) {
366352
if (ioErr != AWS_OP_SUCCESS)
367353
{
368354
fprintf(
369355
stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr));
356+
exit(-1);
370357
}
371-
keysRejectedCompleted = true;
372-
conditionVariable.notify_one();
358+
keysRejectedCompletedPromise.set_value();
373359
};
374360

375361
auto onKeysAccepted = [&](CreateKeysAndCertificateResponse *response, int ioErr) {
@@ -411,17 +397,16 @@ int main(int argc, char *argv[])
411397
exit(-1);
412398
}
413399

414-
registerAcceptedCompleted = true;
415-
conditionVariable.notify_one();
400+
registerAcceptedCompletedPromise.set_value();
416401
};
417402

418403
auto onRegisterRejectedSubAck = [&](int ioErr) {
419404
if (ioErr != AWS_OP_SUCCESS)
420405
{
421406
fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr));
407+
exit(-1);
422408
}
423-
registerRejectedCompleted = true;
424-
conditionVariable.notify_one();
409+
registerRejectedCompletedPromise.set_value();
425410
};
426411

427412
auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) {
@@ -460,8 +445,7 @@ int main(int argc, char *argv[])
460445
exit(-1);
461446
}
462447

463-
registerPublishCompleted = true;
464-
conditionVariable.notify_one();
448+
registerPublishCompletedPromise.set_value();
465449
};
466450

467451
if (csrFile.empty())
@@ -514,11 +498,12 @@ int main(int argc, char *argv[])
514498
registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck);
515499
sleep(1);
516500

517-
conditionVariable.wait(uniqueLock, [&]() {
518-
return keysPublishCompleted.load() && keysAcceptedCompleted.load() && keysRejectedCompleted.load() &&
519-
registerPublishCompleted.load() && registerAcceptedCompleted.load() &&
520-
registerRejectedCompleted.load();
521-
});
501+
keysPublishCompletedPromise.get_future().wait();
502+
keysAcceptedCompletedPromise.get_future().wait();
503+
keysRejectedCompletedPromise.get_future().wait();
504+
registerPublishCompletedPromise.get_future().wait();
505+
registerAcceptedCompletedPromise.get_future().wait();
506+
registerRejectedCompletedPromise.get_future().wait();
522507
}
523508
else
524509
{
@@ -571,19 +556,19 @@ int main(int argc, char *argv[])
571556
registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck);
572557
sleep(2);
573558

574-
conditionVariable.wait(uniqueLock, [&]() {
575-
return csrPublishCompleted.load() && csrAcceptedCompleted.load() && csrRejectedCompleted.load() &&
576-
registerPublishCompleted.load() && registerAcceptedCompleted.load() &&
577-
registerRejectedCompleted.load();
578-
});
559+
csrPublishCompletedPromise.get_future().wait();
560+
csrAcceptedCompletedPromise.get_future().wait();
561+
csrRejectedCompletedPromise.get_future().wait();
562+
registerPublishCompletedPromise.get_future().wait();
563+
registerAcceptedCompletedPromise.get_future().wait();
564+
registerRejectedCompletedPromise.get_future().wait();
579565
}
580566
}
581567

582-
if (!connectionClosed)
568+
/* Disconnect */
569+
if (connection->Disconnect())
583570
{
584-
/* Disconnect */
585-
connection->Disconnect();
586-
conditionVariable.wait(uniqueLock, [&]() { return connectionClosed.load(); });
571+
connectionClosedPromise.get_future().wait();
587572
}
588573
return 0;
589574
}

0 commit comments

Comments
 (0)