07. December 2019
Or: How to achieve single lock-free producer, single blocking consumer communication. While writing the AAudio backend for the cubeb library I came across an interesting multithreaded problem. I’m no expert on multithreading and couldn’t find a solution on the internet - maybe because I’m just not knowing the problem’s name? - so solving this on my own was exciting.
To achieve the best audio latency, we register a callback with AAudio in which we render sounds or process input from the microphone. This callback will then be called from a high priority thread that AAudio manages. The catch: since we are working with potentially really low latencies, we don’t have a whole lot of time to process or output the data. The AAudio docs explicitly state that this callback must not block. Not even for locking a mutex. Using lock-free primitives is quite common when programming the bare bones of an audio engine and I’m already used to that but the situation we have to integrate this with cubeb is somewhat more complicated.
We forward the callback that AAudio triggers in the audio thread to the user of the cubeb library. The user-supplied callback is allowed to signal via its return value that the stream should be drained (i.e. stopped when all data that was already buffered has been output). AAudio has a matching interface: AAudioStream_requestStop
. This function outputs all remaining data for an output stream and then stops the stream. Nice, isn’t it? Almost. Cubeb furthermore states that the backend will call a user-supplied state callback as soon as the draining is complete. AAudio, on the other hand, has no mechanism to notify us when a stream is drained, i.e. stopped. We can just manually block for a state change using AAudioStream_waitForStageChange
. Additionally, AAudio states that the audio callback itself must not call AAudioStream_requestStop
but instead leave this to another thread. So, obviously: we need another worker thread that does all this for us. But we don’t want to start a thread per stream and we additionally have no way to wake up AAudioStream_waitForStageChange
. My first solution was to just check all streams over and over again for state changes or stop requests (when draining) with a sleeping time of 5 milliseconds in between. That worked well. But this is inefficient, especially if you consider that most applications don’t care much for stream draining - or even state changes at all. With that we arrive at the problem: we want to communicate from the realtime audio thread - that must not block under any circumstances - with the consumer thread - that should sleep, i.e. block as long as there isn’t anything to do or wait upon. This could be generalized to a single lock-free producer, single blocking consumer queue. Or the problem of waking up a thread reliably without the chance of blocking in the thread that initiates the wakeup.
So, spin up a mutex and take the condition variables out of your closet. We can quickly come up with a first trivial solution (leaving out details like joining logic or the actual work to be done):
mutex mutex;
condition_variable cv;
// This function is called from the AAudio-managed realtime thread.
// It can be seen as producer. It must not block.
audio_callback {
// output audio data...
// when we change the stream state:
cv.notify_one();
}
// This is the thread that should only get active when there is something
// to do. It can be seen as consumer.
consumer {
unique_lock lock(mutex):
while(true) {
cv.wait(lock);
// check for state changes or things to do
// process everything there is to do
}
}
The mutex we just need for waiting on the condition variable is somewhat useless, right? After a small crisis on my understanding of what condition variables are at all anyhow, I came across a great stackoverflow question and explanation. The main magic in a condition variable is just that it can unlock a mutex and go to sleep atomically. There can’t be anything in between. So I already suspected that not using the mutex anywhere else above was a bad sign regarding my use of the condition variable. Where’s the issue? what could go wrong? Well, the consumer thread could miss signals and go to sleep even though there is still something to do. Imagine this: cv.notify_one()
is called just as the consumer is about to go asleep. The signal will not have any effect since the consumer isn’t sleeping yet. But immediately after this, the consumer will go to sleep even though there is still work to do. And it’s not guaranteed to wake up in finite time again.
Later on, I found a stackoverflow question asking pretty much precisely for what I need. It has one answer that seriously tries to solve the given problem - from Jonathan Wakely, an libstdc++ author -
and gives more or less the solution presented above. But in the following discussion, they found its problems as well without pursuing alternative options.
I thought of various ways this could be solved - with additional atomic flags, an additional mutex on which the audio thread always just calls try_lock
or with a second condition variable. But it all boils down to the same situation: when the consumer thread is about to go asleep, there’s just nothing the realtime thread can do about it. It could wait until the consumer thread really is asleep and then signal the condition variable, but that involves waiting again (and the common way do to so would be to use the mutex).
All these failing thought paths finally lead me to the solution: what if we just spin up another thread that is used for exactly this situation? A thread that must be sleeping when the consumer thread is active and that will simply echo the condition variable notification as soon as the consuming thread is sleeping. How to achieve this? Let’s make use of the previously unused mutex! We already acquire the mutex when the consumer thread is active, let’s acquire the mutex in the new helper thread as well. I called this new helper thread notifier
:
// This is the helper thread that echoes the condition variable notification
// when that is needed.
notifier {
unique_lock(mutex);
while(true) {
cv.wait(lock);
cv.notify_one();
}
}
And that’s it. We just need to add the notifier thread and without any other modifications, the consumer can’t miss signals anymore. I initially started with a cv.notify_all()
in the realtime (producer) thread, but that’s not really needed. So, why does this work?
At any time (after the initial setup; i.e. after both threads reached cv.wait(lock)
for the first time) at least one of notifier
or consumer
must be sleeping (i.e. waiting on cv
) because when they are not sleeping, they own the locked mutex
. That’s where we finally use the real condition variable magic. What happens if the realtime (producer) thread calls cv.notify_one()
? If only the consumer thread is sleeping, it will eventually be woken up by the notification. When the consumer thread is currently active on the other hand - even if it’s about to go asleep without anything else in between - the notifier thread must be currently sleeping since the mutex is still locked. And it will wake up as soon as the consumer thread is going to sleep. And then it will wake up the state thread via the echoed notification. When both threads are waiting on the condition variable, one of them will wake up. And even if this is the notifier thread: it will just echo the notification and wake up the consumer thread with that echo.
The example we are building is quite prone to spurious wakeups - waiting on a condition variable without an external condition is usually a bad idea - but we can fix that as well:
mutex mutex;
condition_variable cv;
atomic<bool> wakeup {false};
// This function is called from the AAudio-managed realtime thread.
// It can be seen as producer. It must not block.
audio_callback {
// output audio data...
// when we change the stream state:
wakeup.store(true);
cv.notify_one();
}
// This is the thread that should only get active when there is something
// to do. It can be seen as consumer.
consumer {
unique_lock lock(mutex):
while(true) {
cv.wait(lock);
while(wakeup.load()) {
wakeup.store(false);
// check for state changes or things to do
// process everything there is to do
}
}
}
// This is the helper thread that echoes the condition variable notification
// when that is needed.
notifier {
unique_lock(mutex);
while(true) {
cv.wait(lock);
if(wakeup.load()) {
cv.notify_one();
}
}
}
This solution has some costs: you need a completely new separate thread. This might be a dealbreaker for some situations. And for some wakeups, you get the overhead of a wrong thread being woken up first. But especially in cases where wakeups should not be needed too often - as it can be expected to be the case for my original case, the AAudio cubeb backend - this is better than just checking in a loop with a fixed time spent sleeping in between.