Code reviewThe bounded queue
The following code has just been written (never run, never tested).
It’s purpose is to serve as a high speed, no locking transport between two threads, once of them producing information, the other consuming it, in a bounded, non blocking manner.
Note that this is done because the default usage of BlockingCollection<T> here generated roughly 80% of the load, which is not ideal
More posts in "Code review" series:
- (26 Jun 2016) The bounded queue
- (15 Apr 2011) Who Can Help Me
- (01 Apr 2011) SharpArchitecture.MultiTenant
- (28 Nov 2007) PetShop 3.0
- (28 Nov 2007) The PetShop Application
Comments
Willing to believe that C# magic makes some of these points moot, but...
Why % QueueSize and not & (QueueSize - 1) given that QueueSize is a power of 2?
What about overflow of _readPos and _writePos? Should they be unsigned so that overflow is well-defined?
Also, can you save on calls to PositionToArrayIndex by storing the indices and not the positions? I.e. doing the % QueueSize on write and not on read?
One error, the Dequeue method should have
_data[readIndex] = null;
instead of
_data[_readPos] = null;
Interesting class, I'll keep it in mind if I ever need something like this.
There is a tuned Spsc queue in corexflab. It is been in .NET since long ago but as an internal class.
https://github.com/dotnet/corefxlab/blob/master/src/System.Threading.Tasks.Channels/System/Collections/Concurrent/SingleProducerConsumerQueue.cs#L9
I think you have bug in Enqueue method, you should increment index before attempting to write. In current version two threads can arrive at line 21 at a same time and write at same index.
2 Roman Kvasov: its called SingleProducer.... for a reason -- there should be only one producing thread, there are more things broken otherwise.
Roman: that race will cause us to erroneously believe the queue is empty when there is data to be read. Inverting those writes would lead us to believe there is a value to be read when that value hasn't been written.
When you write two billion elements to the array, you will get an ArrayBoundsException because _writepos % 256 is less than zero when _writepos is less than zero. Use a uint instead.
David, Pretty much because it doesn't matter. If you look at the disassembly of:
It results in:
In other words, that is such a well known optimization that the compiler will already apply it for me. No need to make the code uglier.
Good point about overflow for the pos variables, changed that.
David, The cost of that (and with a constant) is pretty low, and always doing that is going to complicate the flow of the code, I think.
Marcel, Thanks, fixed
OmariO, That is several orders of magnitudes larger than the code I have, and it does quite a bit more. It also doesn't have a way to bound the number of items in the queue, which is something important for us. This does look interesting, though, thanks for pointing it out.
Roman, This class is specifically meant to be used by a single producer, so there cannot be any concurrent writers.
Since I love nitpicking: The an 0xFF trick applies only for unsigned integers directly. The signed version is a bit longer. Therefore, I know that this disassembly is not actual code but made up. Caught you! :)
So maybe you could make all ints unsigned.
Otherwise, since int.MaxValue overflows to int.MinValue and i'm not sure what int.MinValue % 256 results in I'd check that, or document that or make everything unsigned. I'm sure almost nobody knows the rules for negative modulo.
It doesn't feel very natural to have different code for writeIndex in Enqueue and Dequeue : maybe better to call it nextWriteIndex in Enqueue.
I also would directly keep indices, not for performance reasons (very likely insignificant), but because it naturally avoids overflow, is easier to read in debugger (but loses the info of the number of calls), and simplifies and shortens the code : just one call to a NextIndex() method that does the modulus in each of Enqueue and Dequeue, one when computing nextWriteIndex (which will eventually be assigned to _writeIndex if the queue is not full), and one instead of _readPos++. It would also have avoided the line 36 real error.
Mark, Following the comments in the blog, I made the ints unsigned.
Even though there is a single producer/consumer there is still thread safety issue. If we write to the last position and advance the write index then dequeue the code will think that the array is empty. Maybe use linked list? (i know it will mean jumping in the heap and losing the array locality, but that locality is only good if the byte array are small)
Tal, That won't happen. Note that we are using positions, and Dequeue will only consider the queue empty if both positions are on the same place
At any rate there should be a thread safe flag indicating that the array is full when the enqueue results in filling the queue. That flag should be cleared in the dequeue logic.
Not true, the dequeue should check : _readpos ==_writepos to verify the queue is empty... Not the ‰version
Lost the underscore, I meant the dequeue should read the volatile private fields not the modulo ones
Tal, Assume that the size of the array is 4, now assume that the writeIndex is 16 and the readIndex is 4. The queue is empty because of the modulus.
The read position should never be more than sizeQueue apart from the write position, that defeats the purpose of the limited size queue... Example : Size of queue is 4, position starts from 0. Reading thread just read to index 7 and advance the read position to 8 while the writing thread just wrote to position 11 and advance the write position to 12. Now the writes will fail because the queue is full. The reads will fail because it thinks the queue is empty because 8‰4=0 12%4=0 thus read index equal write index. The reading thread is stuck. This is a deadlock... We need to check 8=12 not 8%4=12%4
Interesting idea, it reminds me of the LMAX Disruptor.
I'm curious what in BlockingCollection was using the CPU time. Was it in a spin-wait? How will the SingleProducerSingleConsumerCircularQueue consumer wait and retry if the queue is empty?
Actually i was wrong :( since the writer is never allowed to bypass the reader. The writer is taking a snapshot of the reader position which guarantee it will never by pass it. The situation i was thinking of is unreachable in the first place...
More about the history of the Bounded Buffer and Concurrent Programming: http://oberon2005.oberoncore.ru/paper/bh2002.pdf
Brinch Hansen wrote a lot about the Bounded Buffer as an OS pioneer.
As _writePos - _readPos is the number of queued items, checking for a full queue becomes
if ( _readPos + QueueSize == _writePos ) return false; // queue full
and checking for an empty queue becomesif ( _readPos == _writePos ) return false; // queue empty
By checking the indices rather than the positions you cannot tell full from empty. By introducing the +1 on line 16 you prevent the queue from filling completely, limiting the size to 255.
Depending on the size of your queued items, there will be false sharing between the producer and the consumer.
There is a possibility of integer overflow (if queue would be really heavy used), when say
_readPos
becomes negative%
operation for first negative operand will result in negative value and boom! IndexOutOfRangeException.Your queue might suffer from false sharing. You could try to add padding to make sure that the position fields are not on the same cache line.
Dave, A lot of the time was spent in TryTake and TryAdd methods.
Matthew, The actual queued items are pointers, and I'm fine with them taking the space. It is typically handled at 16 items at a time, so that handle the cache line most of the time.
meow, Yes, we changed to uint to avoid it.
Is this the same as a ring buffer. Why not just use LMAX disruptor. I think there is a .NET port. I think the requirements seem to match the ones you have and it is fairly widely used and tested.
LMAX disruptor brings in a LOT of additional code that I don't want to have in my system. Having something small and dedicated make a lot more sense for something like that
First, _readPos, _writePos are highly likely to reside on the same cache like so it might impact the performance. Second, Enqueue access _readPos which again can lower the performance. Read it once, store it in a field like _cachedReadPos and read again only if the _writePos overlaps with the cached value. You can take a look at my OneToOneBuffer ported from the Aeron: https://github.com/Scooletz/RampUp/blob/master/src/RampUp/Ring/OneToOneRingBuffer.cs
Scooletz, Thanks, I added some field padding to avoid false sharing, and we now only read the read position once per dequeue
Can you please share the final version of this class?
It is here: https://github.com/ayende/ravendb/blob/v4.0/src/Sparrow/Logging/SingleProducerSingleConsumerCircularQueue.cs
Comment preview