-
Notifications
You must be signed in to change notification settings - Fork 75
[Post-1.14] Replace boost::spsc_queue by a fast SPMC queue #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- pop_sample is now lockfree again, except if buffer full or empty and timeout is used - queue is much less of a bottleneck now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted I haven't looked through all the details, but what I've seen looks like a clear win to me.
On my wishlist (apart from a circular buffer that directly stores the sample data to avoid the pointer indirections) I have only two functions to push / pull sample ranges, but those can be added later if needed.
@@ -31,6 +31,34 @@ extern "C" { | |||
"Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions." | |||
#endif | |||
|
|||
// size of a cache line |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block should be in consumer_queue.h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah well there's one thing -- we'll need that definition in sample.h
too for the same reason (the 2 atomics in the factory there are prone to false sharing).
#define CACHELINE_BYTES 64 | ||
#endif | ||
|
||
// force-inline the given function, if possible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC Boost has macros for both forceinline and likely/unlikely but with more supported compilers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point - will use those. For likely/unlikely, I'd expect us to sprinkle that in along the "hot" path here and there over time. There's a bit of an issue with the boost definitions, which differ from how e.g. the Linux kernel or UE4 define them, in that they don't include the double-not. That causes them to be prone to accidentally wrong usage (e.g., if I were to replace if (myptr)
by if (BOOST_LIKELY(myptr))
, that would expand to if (__builtin_expect(myptr, 1))
-- i.e., it assumes that the pointer is 1, rather than that the expression evaluates to true in a boolean context (causing it to fail to optimize, and in some cases might even pessimize the code path). We could try to add that to our mental checklist when reviewing code that uses that, but it feels more prudent to write it into the macro since otherwise it'd put extra burden on the devs/reviewers. One might even argue that the boost versions are misleading names for something that would be better described as BOOST_LIKELYONE
and BOOST_LIKELYZERO
.
But to gain the extra compilers supported by the boost versions while keeping the ease of use we could define it here as
#define LIKELY(x) BOOST_LIKELY(!!(x))
#define UNLIKELY(x) BOOST_UNLIKELY(!!(x))
For the forceinline, do you prefer using the boost version in the code or should we alias that here similarly to a shorter macro name, as in
#define FORCEINLINE BOOST_FORCEINLINE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the boost source code; BOOST_LIKELY is defined for mostly GCC/Clang and the Intel and IBM compilers. MSVC waits for C++20's [[likely]]
attribute and wants assumes the condition in if()
to be true.
So I'm for #define LIKELY(x) BOOST_LIKELY(!!(x))
and using that everywhere. There's just one UNLIKELY
that'd need to be changed in the code below. As for BOOST_FORCEINLINE
, I'd just use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, will do that
#include "send_buffer.h" | ||
#include <chrono> | ||
|
||
using namespace lsl; | ||
|
||
consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry) | ||
: registry_(registry), buffer_(max_capacity) { | ||
consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::uint_fast32_t
would let the compiler choose the best int size that's at least 4 bytes wide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting, didn't know about that one! On that, I've actually been wondering if we want to keep limiting our buffers to 32-bit sizes in the long run (e.g., if I declare a stream with a 6 MHz sampling rate, and if someone connects to that with a default 360 s max_buflen (e.g. LabRecorder), that will crash liblsl currently with no error message since max_buflen (being an int) will wrap around to a negative value that then blows up later). That happened to me while testing jfrey's use case. It made me think that we might want to, in the very long run, replace some things that are currently int
by size_t
, and one could slowly but surely build up towards without breaking anything by starting with some low-level containers. The external API wouldn't be affected by that since the size value there is before it's multiplied by the potentially huge sampling rate.
Just something to think about, especially now that 6 MHz is actually achievable with a sufficiently speedy PC, and also considering that some specialized/professional use cases may find the ability to have such large buffers practical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A sample is at least 48 bytes on x64 so that'd require at least 192GB RAM per queue. Not totally impossible, but I guess not the typical LSL users.
wrap_at_(std::numeric_limits<std::size_t>::max() - size - std::numeric_limits<std::size_t>::max() % size) | ||
{ | ||
assert(size_ > 1); | ||
for (std::size_t i=0; i<size_; ++i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to store()
everything in the constructor already could we use standard initialization (: write_idx_(0), read_idx_(0)
…)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I looked it up, I was actually under the impression that write_idx_(0) would use the default memory order of std::atomic
when it's used like a plain value, which is memory_order_seq_cst
, and that's quite a bit stronger than what this data structure needs (so I'd worry a bit about some compiler pessimizations, e.g., additional memory fences). However, it turns out that that type of initialization is apparently non-atomic, so that'd mean that the explicit store is actually necessary (since another thread can later acquire the value).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about an std::atomic_thread_fence(std::memory_order_release);
at the end of the constructor?
uint32_t n = 0; | ||
while (buffer_.pop()) n++; | ||
while (try_pop()) n++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick idea without having thought it through: can we make something like auto diff=write_idx_-read_idx_; read_idx_+=diff; return diff;
work in a multithreaded context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be pretty nice, I'll look into that. The implementation would probably come down to something similar to the range/iterator versions of push/pop like those that boost.lockfree has.
|
||
consumer_queue(const consumer_queue&) = delete; | ||
consumer_queue(consumer_queue&&) = delete; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving a queue could come in useful as long as we make sure nobody's 1) using the moved-from queue 2) there's no other thread accessing the queue during the move
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we could add those ops (I think setting the read/write indices would need the same kinds of store ops as the constructor has). Need to make sure that the condition variable is movable though (might not be).
std::condition_variable cv_; // to allow for blocking wait by consumer | ||
// an item stored in the queue | ||
struct item_t { | ||
std::atomic<std::size_t> seq_state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked this though in detail, but I'll hopefully get to it after the 1.14 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I also hope to find some time to contrib some unit tests that stress concurrency a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already have some benchmarks in the unit tests that stress the queue a fair bit, but more are always welcome.
/// threshold at which to wrap read/write indices | ||
const std::size_t wrap_at_; | ||
// whether we have performed a sync on the data stored by the constructor | ||
alignas(CACHELINE_BYTES) std::atomic<bool> done_sync_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the aligned members to consecutive? IIUC these three members will have at least 56 empty padding bytes between them, so could we reorder the members to put the space to use?
@chkothe I know you wanted to do a little more work on this. No rush. I just wanted to bring to our attention a user reporting CPU utilization going up 10x and I think it is because the outlet buffer is overflowing. Let's make sure to test performance in the outlet-buffer-full case. |
Yes, planning to do some more polish & account for some of the suggested changes once I'm out of my current sprint (hopefully over Christmas). For the buffer overflow case: we need to probably emit some one-time (or periodically repeated log warning that there's a buffer overflow, since in that case the user is going to have data loss and significant transmit delay). Basically we'd want to recognize that as not a normal (steady-state) operating condition but more like a "you have a big problem with your data transmission" kind of situation. |
FYI, I've rebased the branch and pushed it here. I'll see that I'll finish the review and some benchmarks so we can get this merged for 1.16 |
Looks good to me so far, the remaining comments aren't critical and can be addressed after it's merged. I found one optimization for #include <cstdint>
using T = std::size_t;
extern const T wrap_at_;
inline T _add_wrap(T x, T delta) {
const T xp = x + delta;
return xp >= wrap_at_ ? xp - wrap_at_ : xp;
}
T add1_wrap(T x) {
return ++x == wrap_at_ ? 0 : x;
}
T add_wrap(T x) {
return _add_wrap(x, 1);
} x86_64: add1_wrap(unsigned long): # @add1_wrap(unsigned long)
add rdi, 1
xor eax, eax
cmp rdi, qword ptr [rip + wrap_at_]
cmovne rax, rdi
ret
add_wrap(unsigned long): # @add_wrap(unsigned long)
mov rax, rdi
add rax, 1
mov rcx, qword ptr [rip + wrap_at_]
xor edx, edx
cmp rax, rcx
cmovae rdx, rcx
sub rax, rdx
ret ARM: add1_wrap(unsigned long):
adrp x1, wrap_at_
add x0, x0, 1
ldr x1, [x1, #:lo12:wrap_at_]
cmp x1, x0
csel x0, x0, xzr, ne
ret
add_wrap(unsigned long):
adrp x1, wrap_at_
add x0, x0, 1
ldr x1, [x1, #:lo12:wrap_at_]
subs x1, x0, x1 # avoided in add1_wrap
csel x0, x1, x0, cs
ret |
I have removed the
|
Prompted by jfrey-xx's recent work on the
consumer_queue
, which revealed a concurrency issue with the spsc queue at buffer overflow, and which traded lower CPU load at low sampling rates for higher load at high rates, I’ve spent a bit of time resolving an issue underlying that queue, so we can have the best of both worlds (high throughput and low CPU use). Which is that, in our scenario, there’s an edge case where the producer becomes a second consumer, and thus we technically need an SPMC queue not an SPSC queue.In the current implementation this is addressed by some additional locks to protect against that scenario, but unfortunately it means that producer and consumer cannot make progress at the same time (i.e., they stall each other). This is only revealed by 2-thread benchmarks, which I believe we don’t have in the benchmark suite yet (I added a benchmark script to my fork that can measure that).
I’ve looked around a bit to find an SPMC queue that would get the job done, and it turns out that there’s a pretty frequently used ring buffer scheme for that (used among others in xenium, a queue by Rigtorp used in Frostbite, and a few other pieces of software, and an original implementation by Vyukov, all of which are in fact MPMC). It’s actually faster than boost’s
spsc_queue
(because it doesn’t need to synchronize between reader and writer at all, unless they act on the same queue element). So I’ve adapted that for our consumer_queue (try_push
/try_pop
), in a simplified SPMC version, since we only need one producer (that gives a bit of extra performance).I’ve done a bit of benchmarking, and on my machine (Ubuntu laptop) it comes out at >60% higher max end-to-end throughput between a
push_sample
andpull_sample
thread relative to the current lock-based variant (~4.8 MHz instead of ~2.9MHz). With these changes, there’s only one (rarely contended and thus quite fast) lock left around thecv.notify_one()
for use with a wait timeout, but since now it's faster than even the old lockfree variant, we should be ok for a while. (One can use optional locking to make the queue again fully lock-free & wait-free except when it’s full or empty, but I'd need to do a bit more testing on that first).Other than that I’ve done a decent amount of testing, but of course we’ll want to let it simmer a bit and maybe write a few more unit tests before we can pass that on to users.