I was going through some code to make it simpler and came upon a simple producer-subscriber problem. Multiple threads were pushing objects into a queue, and a single thread would dequeue them to perform the actual work.
There was a lot of code, the code to lock the collection to make it thread safe and the code to grab items from the queue as it became available (while releasing the thread if no work was available).
Here was the code to push data into the queue:
lock (this.ItemQueue)
{
this.ItemQueue.Enqueue(item);
Monitor.Pulse(this.ItemQueue);
}
And here was the code in the worker thread:
int count = 0;
Item item = null;
while (count == 0)
{
lock (this.ItemQueue)
{
count = this.ItemQueue.Count;
if (count != 0)
{
item = this.ItemQueue.Dequeue();
break;
}
Monitor.Wait(this.ItemQueue);
count = this.ItemQueue.Count;
if (count != 0)
item = this.ItemQueue.Dequeue();
}
}
Well, the first problem to solve (thread safe accessing of the queue) can be done by using items in the Collections.Concurrent namespace. The natural choice was ConcurrentQueue. This simplifies the enqueuing code to simply:
this.ItemQueue.Add(item); //notice that it isn't .Enqueue()... keep on reading !
With the updated queue, we could simplify the dequeuing code significantly. The problem to solve next was that we didn’t want try dequeuing in a loop while we try to grab something, because that would grab all the cpu. The addition of Thread.Sleep() would allow us to relinquish the cpu, at the cost that we have to wait for the sleep to finish in order to see if there is an item to process. Going back to thread events (semaphores, mutexes, monitors…) complicates code and I don’t like that.
Then I discovered BlockingCollection. It is an abstraction that wraps a ConcurrentQueue or ConcurrentStack that enables to pub-sub pattern. Adding to the collection is easy, and getting from the collection is blocking (thread relinquishes cpu till an item becomes available. The perfect solution.
This is the new dequeuing code:
item = ItemQueue.Take();

Leave a comment