Non-blocking Queue
It is no secret that thread synchronization mechanisms in multithreaded applications consume a lot of resources. In .NET, starting with version 3.5, some synchronization primitives were introduced that take into account the fact that calls to Windows system functions are very expensive. For example, the ReaderWriterLockSlim class is a replacement for ReaderWriterLock. The ReaderWriterLockSlim class uses a simple spin loop with checks and only after some time does it fall back to calling Windows functions. Despite 100% CPU core utilization during spinning, this implementation is considered more efficient for tasks where locking and thread waiting are short-lived.
In .NET 4, thread-safe collections such as ConcurrentQueue, ConcurrentDictionary, and ConcurrentHashSet appeared. These collections use locks, and therefore their use is considered inefficient.
When building non-blocking collections, several points must be taken into account. All changes performed by a thread must be atomic. If the changes are not atomic, there must be some atomic initiating action that allows threads, first, to distinguish their own changes from those made by other threads, and second, to compete for the right to perform an operation on the collection. Such an atomic action can be changing the value of an integer variable, for example, from 0 to a unique thread identifier. In this case, another thread, after checking the value of the variable, can decide whether to wait or to perform an operation on the collection.
while (syncVar != threadId)
if (syncVar == 0)
syncVar = threadId;
... // perform operation
syncVar = 0;
The problem here is that checking and setting the value must be an atomic operation. That is, if you write it directly as in the code above, there is no guarantee that the system will not preempt the thread between the check and the assignment.
Fortunately, Intel IA32 and AMD64 architectures provide the CMPXCHG instruction, which performs a comparison and assignment if the comparison succeeds. When used with the LOCK prefix, it executes atomically, guaranteeing that no other thread can access the same memory location. The equivalent of this instruction in .NET is the Interlocked.CompareExchange method, which is translated by JIT and AOT compilers into the same CMPXCHG instruction with the LOCK prefix. Using Interlocked.CompareExchange, the code above would look like this.
while (syncVar != threadId)
Interlocked.CompareExchange(ref syncVar, threadId, 0); // compare with 0 and replace with threadId if syncVar == 0
... // perform operation
syncVar = 0;
Now we can proceed to building a non-blocking queue. To do this, we define a class representing a queue element, which will store the value and a reference to the next element.
class QueueItem
{
public T Value;
public QueueItem Next;
public QueueItem()
{
}
public QueueItem(T value)
{
Value = value;
}
}
In the queue, we need to create one empty element so that when adding and removing elements we do not need to perform extra null checks (when the queue is empty or becomes empty).
public class LockFreeQueue<T>
{
private int _count;
private QueueItem _head;
private QueueItem _tail;
public LockFreeQueue()
{
_head = _tail = new QueueItem(); // empty element
}
public void Enqueue(T value)
{
QueueItem item = new QueueItem(value);
// TODO
}
}
For thread synchronization, the _tail.Next variable will be used—the reference to the next element in the queue after the tail. In the normal state, it is null, since _tail points to the end of the queue. Also, each thread entering the Enqueue method will create its own local instance of the QueueItem class; this object can be used instead of a thread identifier. The Interlocked.CompareExchange method returns the value of the first argument if it was not changed, or the previous value of the first argument if it was changed. This previous value will be null in the state where no thread has yet entered the competition to add an element. Thus, we can write a check and a contention loop.
public void Enqueue(T value)
{
QueueItem item = new QueueItem(value);
while (Interlocked.CompareExchange(ref _tail.Next, item, null) != null) ;
// TODO
}
However, this is not all. After the condition in the loop becomes false, this means that the current thread has won the contention, and the value in _tail.Next has been successfully replaced with item. To complete the add operation, we need to move the queue tail to the new element and increment the counter.
public void Enqueue(T value)
{
QueueItem item = new QueueItem(value);
while (Interlocked.CompareExchange(ref _tail.Next, item, null) != null) ;
_tail = item;
Interlocked.Increment(ref _count);
}
The operation of adding an element to the queue is now implemented.
Now for removal. Since elements are added at the end and removed from the beginning, the add operation will not interfere with the remove operation. In removal, we need to check the case when the queue is empty, that is, when _tail equals _head.
public bool Dequeue(out T value)
{
if (_tail != _head)
{
// TODO
return true;
}
value = default(T);
return false;
}
We need to save the head of the queue, compare it with the tail, and move the head of the queue forward to the next element. All of this must be done in a loop, because some other thread may remove an element in the middle of the removal operation in the current thread. Moving the head of the queue forward to the next element must be done using Interlocked.CompareExchange, since we need to be sure that the head value was not changed by another thread. We obtain the final version of the element removal method.
public bool Dequeue(out T value)
{
bool success = false;
QueueItem item = _head;
while (!success)
{
item = _head;
if (item == _tail)
{
value = default(T);
return false;
}
success = Interlocked.CompareExchange(ref _head, item.Next, item) == item;
}
Interlocked.Decrement(ref _count);
value = item.Next.Value;
item.Next.Value = default(T);
return true;
}
Such a queue is efficient in a multithreaded application and does not use additional resources. In the worst case, when N threads contend in the add and remove procedures, each thread goes through N−1 iterations, which corresponds to 2 × (N−1) CPU instructions in the case of adding an element and 5 × (N−1) CPU instructions in the case of removing an element. This is negligible compared to calling WinAPI functions.




