From 9a681d6669d5d1c6f5b681af78b8d15e6075917b Mon Sep 17 00:00:00 2001 From: zeyus Date: Thu, 10 Apr 2025 16:52:31 +0200 Subject: [PATCH 1/7] Possibly fix #218 - next->_next might be unavailble. --- src/sample.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/sample.cpp b/src/sample.cpp index b95521b0..9a64c756 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -466,11 +466,13 @@ sample *factory::pop_freelist() { } factory::~factory() { - for (sample *cur = tail_, *next = cur->next_;; cur = next, next = next->next_) { - if (cur != sentinel()) delete cur; - if (!next) break; - } - delete[] storage_; + sample *cur = tail_; + while (cur) { + sample *next = cur->next_; // Save next pointer before potentially deleting cur + if (cur != sentinel()) delete cur; + cur = next; + } + delete[] storage_; } void factory::reclaim_sample(sample *s) { From afacf89a71262f44d5e4fdb9f70dc5490b60ddd5 Mon Sep 17 00:00:00 2001 From: zeyus Date: Thu, 10 Apr 2025 17:04:22 +0200 Subject: [PATCH 2/7] Try deleting later. --- src/sample.cpp | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/sample.cpp b/src/sample.cpp index 9a64c756..a05b85d0 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -466,13 +466,25 @@ sample *factory::pop_freelist() { } factory::~factory() { - sample *cur = tail_; - while (cur) { - sample *next = cur->next_; // Save next pointer before potentially deleting cur - if (cur != sentinel()) delete cur; - cur = next; - } - delete[] storage_; + // get pending samples + std::vector pending; + sample *cur = tail_.load(); + + // add them without deleting + while (cur) { + if (cur != sentinel()) { + pending.push_back(cur); + } + sample *next = cur->next_.load(); + cur = next; + } + + // now delete the samples + for (sample* s : pending) { + delete s; + } + + delete[] storage_; } void factory::reclaim_sample(sample *s) { From 1c7ccec40d7c51d87fd3dacafa9f52cfe5f6987b Mon Sep 17 00:00:00 2001 From: zeyus Date: Thu, 10 Apr 2025 17:20:23 +0200 Subject: [PATCH 3/7] Try only deleting samples outside storage. --- src/sample.cpp | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/sample.cpp b/src/sample.cpp index a05b85d0..9fa41c55 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -466,24 +466,17 @@ sample *factory::pop_freelist() { } factory::~factory() { - // get pending samples - std::vector pending; sample *cur = tail_.load(); - - // add them without deleting while (cur) { - if (cur != sentinel()) { - pending.push_back(cur); - } sample *next = cur->next_.load(); - cur = next; - } - // now delete the samples - for (sample* s : pending) { - delete s; - } + // Only delete samples that are outside of storage area + if (cur != sentinel() && (static_cast(cur) < storage_ || + static_cast(cur) >= storage_ + storage_size_)) + delete cur; + cur = next; + } delete[] storage_; } From e37971c4b8e0985f31bf58f306d89c00137db9ce Mon Sep 17 00:00:00 2001 From: zeyus Date: Fri, 11 Apr 2025 15:38:38 +0200 Subject: [PATCH 4/7] Just testing... --- testing/ext/bench_pushpull.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/testing/ext/bench_pushpull.cpp b/testing/ext/bench_pushpull.cpp index a5b7e020..fc7e8ee9 100644 --- a/testing/ext/bench_pushpull.cpp +++ b/testing/ext/bench_pushpull.cpp @@ -32,10 +32,11 @@ TEMPLATE_TEST_CASE("pushpull", "[basic][throughput]", char, double, std::string) auto found_stream_info(lsl::resolve_stream("name", name, 1, 2.0)); REQUIRE(!found_stream_info.empty()); - std::list inlet_list; for (auto n_inlets : param_inlets) { + std::list inlet_list; while (inlet_list.size() < n_inlets) { - inlet_list.emplace_front(found_stream_info[0], 300, false); + lsl::stream_info info_copy(found_stream_info[0]); + inlet_list.emplace_front(info_copy, 300, false); inlet_list.front().open_stream(.5); } std::string suffix(std::to_string(nchan) + "_inlets_" + std::to_string(n_inlets)); @@ -49,7 +50,20 @@ TEMPLATE_TEST_CASE("pushpull", "[basic][throughput]", char, double, std::string) out.push_chunk_multiplexed(data, chunk_size); for (auto &inlet : inlet_list) inlet.flush(); }; + + // Explicitly close and delete the inlets to ensure that they are not + // still in use when the next inlet is created. + for (int i = 0; i < n_inlets; i++) { + inlet_list.back().close_stream(); + inlet_list.pop_back(); + } + } + // Wait until all inlets are closed + // this hangs forever + // while (out.have_consumers()) { + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // } } } From b7b51970e148e197ef4775e48d76308c7cdad21f Mon Sep 17 00:00:00 2001 From: zeyus Date: Mon, 2 Jun 2025 11:42:02 +0200 Subject: [PATCH 5/7] test passed 3+ times locally. --- src/sample.cpp | 9 ++++--- testing/ext/bench_pushpull.cpp | 48 ++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/sample.cpp b/src/sample.cpp index 9fa41c55..a472b89f 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -470,11 +470,12 @@ factory::~factory() { while (cur) { sample *next = cur->next_.load(); - // Only delete samples that are outside of storage area - if (cur != sentinel() && (static_cast(cur) < storage_ || - static_cast(cur) >= storage_ + storage_size_)) + // Delete sample if it's not the sentinel and not in the storage area + if (cur != sentinel() && + (reinterpret_cast(cur) < storage_ || + reinterpret_cast(cur) >= storage_ + storage_size_)) { delete cur; - + } cur = next; } delete[] storage_; diff --git a/testing/ext/bench_pushpull.cpp b/testing/ext/bench_pushpull.cpp index fc7e8ee9..e3e60ba3 100644 --- a/testing/ext/bench_pushpull.cpp +++ b/testing/ext/bench_pushpull.cpp @@ -27,43 +27,53 @@ TEMPLATE_TEST_CASE("pushpull", "[basic][throughput]", char, double, std::string) lsl::channel_format_t cf = (lsl::channel_format_t)SampleType::chan_fmt; for (auto nchan : param_nchan) { + // Create outlet with a unique name for each test iteration + std::string unique_name = std::string(name) + "_" + std::to_string(nchan) + "_" + + std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()); lsl::stream_outlet out( - lsl::stream_info(name, "PushPull", (int)nchan, chunk_size, cf, "streamid")); - auto found_stream_info(lsl::resolve_stream("name", name, 1, 2.0)); + lsl::stream_info(unique_name, "PushPull", (int)nchan, chunk_size, cf, "streamid")); + + // Wait for outlet to be discoverable + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto found_stream_info(lsl::resolve_stream("name", unique_name, 1, 2.0)); REQUIRE(!found_stream_info.empty()); for (auto n_inlets : param_inlets) { - std::list inlet_list; - while (inlet_list.size() < n_inlets) { - lsl::stream_info info_copy(found_stream_info[0]); - inlet_list.emplace_front(info_copy, 300, false); - inlet_list.front().open_stream(.5); + std::vector> inlets; + + // Create inlets + for (std::size_t i = 0; i < n_inlets; ++i) { + inlets.emplace_back(std::make_unique(found_stream_info[0], 300, false)); + inlets.back()->open_stream(.5); + } + + // Wait for consumers to connect + if (n_inlets > 0) { + out.wait_for_consumers(1.0); } + std::string suffix(std::to_string(nchan) + "_inlets_" + std::to_string(n_inlets)); BENCHMARK("push_sample_nchan_" + suffix) { for (size_t s = 0; s < chunk_size; s++) out.push_sample(data); - for (auto &inlet : inlet_list) inlet.flush(); + for (auto &inlet : inlets) inlet->flush(); }; BENCHMARK("push_chunk_nchan_" + suffix) { out.push_chunk_multiplexed(data, chunk_size); - for (auto &inlet : inlet_list) inlet.flush(); + for (auto &inlet : inlets) inlet->flush(); }; - // Explicitly close and delete the inlets to ensure that they are not - // still in use when the next inlet is created. - for (int i = 0; i < n_inlets; i++) { - inlet_list.back().close_stream(); - inlet_list.pop_back(); + // Explicitly close inlets and wait for cleanup + for (auto &inlet : inlets) { + inlet->close_stream(); } + inlets.clear(); + // Give time for network cleanup between iterations + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } - // Wait until all inlets are closed - // this hangs forever - // while (out.have_consumers()) { - // std::this_thread::sleep_for(std::chrono::milliseconds(1)); - // } } } From 63a9e247f9c3eced1f492361fb3c3651599e4a06 Mon Sep 17 00:00:00 2001 From: zeyus Date: Mon, 2 Jun 2025 12:13:51 +0200 Subject: [PATCH 6/7] ... --- src/sample.cpp | 23 ++++--- testing/ext/bench_pushpull.cpp | 106 +++++++++++++++++++++------------ 2 files changed, 77 insertions(+), 52 deletions(-) diff --git a/src/sample.cpp b/src/sample.cpp index a472b89f..b38f8034 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -6,6 +6,7 @@ #include "portable_archive/portable_oarchive.hpp" #include "util/cast.hpp" #include +#include using namespace lsl; using lslboost::endian::endian_reverse_inplace; @@ -466,19 +467,15 @@ sample *factory::pop_freelist() { } factory::~factory() { - sample *cur = tail_.load(); - while (cur) { - sample *next = cur->next_.load(); - - // Delete sample if it's not the sentinel and not in the storage area - if (cur != sentinel() && - (reinterpret_cast(cur) < storage_ || - reinterpret_cast(cur) >= storage_ + storage_size_)) { - delete cur; - } - cur = next; - } - delete[] storage_; + sample* cur = tail_.load(); + while (cur) { + sample* next = cur->next_.load(); + if (cur != sentinel()) { + delete cur; + } + cur = next; + } + delete[] storage_; } void factory::reclaim_sample(sample *s) { diff --git a/testing/ext/bench_pushpull.cpp b/testing/ext/bench_pushpull.cpp index e3e60ba3..4b546c92 100644 --- a/testing/ext/bench_pushpull.cpp +++ b/testing/ext/bench_pushpull.cpp @@ -23,56 +23,84 @@ TEMPLATE_TEST_CASE("pushpull", "[basic][throughput]", char, double, std::string) const TestType data[max_nchan * chunk_size] = {sample_value::val}; - const char *name = SampleType::fmt_string(); + const char *base_name = SampleType::fmt_string(); lsl::channel_format_t cf = (lsl::channel_format_t)SampleType::chan_fmt; for (auto nchan : param_nchan) { - // Create outlet with a unique name for each test iteration - std::string unique_name = std::string(name) + "_" + std::to_string(nchan) + "_" + - std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()); - lsl::stream_outlet out( - lsl::stream_info(unique_name, "PushPull", (int)nchan, chunk_size, cf, "streamid")); - - // Wait for outlet to be discoverable - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - auto found_stream_info(lsl::resolve_stream("name", unique_name, 1, 2.0)); - REQUIRE(!found_stream_info.empty()); - for (auto n_inlets : param_inlets) { + // Create a unique inlet name for each test combination + auto now = std::chrono::high_resolution_clock::now(); + auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); + std::string unique_name = std::string(base_name) + "_" + + std::to_string(nchan) + "_" + + std::to_string(n_inlets) + "_" + + std::to_string(timestamp); + + // Create outlet in its own scope + std::unique_ptr out; std::vector> inlets; - - // Create inlets - for (std::size_t i = 0; i < n_inlets; ++i) { - inlets.emplace_back(std::make_unique(found_stream_info[0], 300, false)); - inlets.back()->open_stream(.5); - } - - // Wait for consumers to connect - if (n_inlets > 0) { - out.wait_for_consumers(1.0); - } - std::string suffix(std::to_string(nchan) + "_inlets_" + std::to_string(n_inlets)); + try { + out = std::make_unique( + lsl::stream_info(unique_name, "PushPull", (int)nchan, chunk_size, cf, unique_name + "_src")); - BENCHMARK("push_sample_nchan_" + suffix) { - for (size_t s = 0; s < chunk_size; s++) out.push_sample(data); - for (auto &inlet : inlets) inlet->flush(); - }; + // Wait for outlet + std::this_thread::sleep_for(std::chrono::milliseconds(200)); - BENCHMARK("push_chunk_nchan_" + suffix) { - out.push_chunk_multiplexed(data, chunk_size); - for (auto &inlet : inlets) inlet->flush(); - }; + // Resolve the stream + auto found_stream_info = lsl::resolve_stream("name", unique_name, 1, 3.0); + if (found_stream_info.empty()) { + WARN("Could not resolve stream " << unique_name); + continue; + } - // Explicitly close inlets and wait for cleanup - for (auto &inlet : inlets) { - inlet->close_stream(); + // Create inlets + for (std::size_t i = 0; i < n_inlets; ++i) { + inlets.emplace_back(std::make_unique(found_stream_info[0], 300, 0, false)); + inlets.back()->open_stream(1.0); + } + + // Wait for all consumers to connect + if (n_inlets > 0) { + auto start_wait = std::chrono::steady_clock::now(); + while (!out->have_consumers() && + std::chrono::steady_clock::now() - start_wait < std::chrono::seconds(2)) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + std::string suffix(std::to_string(nchan) + "_inlets_" + std::to_string(n_inlets)); + + BENCHMARK("push_sample_nchan_" + suffix) { + for (size_t s = 0; s < chunk_size; s++) { + out->push_sample(data); + } + for (auto &inlet : inlets) { + inlet->flush(); + } + }; + + BENCHMARK("push_chunk_nchan_" + suffix) { + out->push_chunk_multiplexed(data, chunk_size); + for (auto &inlet : inlets) { + inlet->flush(); + } + }; + + } catch (const std::exception& e) { + WARN("Exception in benchmark: " << e.what()); + } + // Cleanup + for (auto& inlet : inlets) { + try { + inlet->close_stream(); + } catch (...) { + // Ignore cleanup errors + } } inlets.clear(); - - // Give time for network cleanup between iterations - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + out.reset(); + // Give extra time for cleanup + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } From b06de121e1ed9c4ef49f94e255910da3d794296a Mon Sep 17 00:00:00 2001 From: zeyus Date: Mon, 2 Jun 2025 12:24:41 +0200 Subject: [PATCH 7/7] Removed blank line (test-rerun). --- testing/ext/bench_pushpull.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/testing/ext/bench_pushpull.cpp b/testing/ext/bench_pushpull.cpp index 4b546c92..03a397b8 100644 --- a/testing/ext/bench_pushpull.cpp +++ b/testing/ext/bench_pushpull.cpp @@ -78,14 +78,12 @@ TEMPLATE_TEST_CASE("pushpull", "[basic][throughput]", char, double, std::string) inlet->flush(); } }; - BENCHMARK("push_chunk_nchan_" + suffix) { out->push_chunk_multiplexed(data, chunk_size); for (auto &inlet : inlets) { inlet->flush(); } }; - } catch (const std::exception& e) { WARN("Exception in benchmark: " << e.what()); }