Synchronization primitives, MulticastAutoResetEvent
I have a very interesting problem within RavenDB. I have a set of worker processes that all work on top of the same storage. Whenever a change happen in the storage, they wake up and start working on it. The problem is that this change may be happening while the worker process is busy doing something other than waiting for work, which means that using Monitor.PulseAll, which is what I was using, isn’t going to work.
AutoResetEvent is what you are supposed to use in order to avoid losing updates on the lock, but in my scenario, I don’t have a single worker, but a set of workers. And I really wanted to be able to use PulseAll to release all of them at once. I started looking at holding arrays of AutoResetEvents, keeping tracking of all changes in memory, etc. But none of it really made sense to me.
After thinking about it for a while, I realized that we are actually looking at a problem of state. And we can solve that by having the client hold the state. This led me to write something like this:
public class MultiCastAutoResetEvent { private readonly object waitForWork = new object(); private int workCounter = 0; public void NotifyAboutWork() { Interlocked.Increment(ref workCounter); lock (waitForWork) { Monitor.PulseAll(waitForWork); Interlocked.Increment(ref workCounter); } } public void WaitForWork(TimeSpan timeout, ref int workerWorkCounter) { var currentWorkCounter = Thread.VolatileRead(ref workCounter); if (currentWorkCounter != workerWorkCounter) { workerWorkCounter = currentWorkCounter; return; } lock (waitForWork) { currentWorkCounter = Thread.VolatileRead(ref workCounter); if (currentWorkCounter != workerWorkCounter) { workerWorkCounter = currentWorkCounter; return; } Monitor.Wait(waitForWork, timeout); } } }
By forcing the client to pass us the most recently visible state, we can efficiently tell whatever they still have work to do or do they have to wait.
Comments
Am I missing something here? Where is workCounter defined. And wat is doWork used for?
Samuel,
You aren't.
I forgot to add the workCounter, which is just a field.
doWork (which I removed from the post) allows me to stop doing work. It isn't important for the topic at hand, so I removed it
Are you waking up multiple threads here just to have all but one of them go back to sleep?
Is your data manipulated by a single task in a pool and then removed, or do you have multiple threads which all do something different with the same data?
Peter,
I am waking multiple threads, so all of them can now do some work concurrently.
within NotifyAboutWork(), within the lock, after Pulse, shouldn't there be a call to Decrease instead of Increase?
Oh, it's not the "amount of work left" counter, but "number of pulses until now" counter.
Ken,
No, I don't want to decrease it.
It is okay for it to overflow, and I am just adding the second inc to be extra safe.
Monitor.Wait(waitForWork, timeout); inside a lock(waitForWork) doesn't seem right to me... How can PulseAll get called if that has its own lokc(waitforWork) around it?
Configurator,
That is how the Monitor works.
From the docs:
"The thread that currently owns the lock on the specified object invokes this method in order to release the object so that another thread can access it. The caller is blocked while waiting to reacquire the lock. This method is called when the caller needs to wait for a state change that will occur as a result of another thread's operations."
Note that there is a difference between Monitor.Enter and Monitor.Wait
can't you just check if there's more work to do in a loop and if not, hang on an event? This way Monitor.PulseAll would be sufficient.
Rafal,
Show me the code that will release multiple threads, including those not currently waiting on the event, to do so.
But why do you need to release threads that aren't waiting on the event? I was thinking about sth like this:
while(true)
{
if (MoreWorkToDo)
{
DoTheWork();
}
else
{
Monitor.Wait();
}
}
Imagine that I am just before this line:
if (MoreWorkToDo)
And then the Pulse happens. Using your code, I'll miss the notification.
Ah right, of course. I had forgotten that's how wait works :\
Your solution may work, but wouldn't it be a simpler approach to have a work queue in each worker thread?
Then when a change is made on your storage, you could simply queue an event on each worker queue.
Each worker thread could then wait when its queue is empty, then when an event is queued a simple Monitor.Pulse would be needed because each queue is only accessible by one worker thread.
Hope make sense.
Jeff
Jeff,
Not really, no.
Work queues assume one item per task, in my case, I just want to wake those guys up, since they may have multiple tasks waiting for them.
It means that I don't have to do any work whatsoever for batching
Would it not be more simple to use the threadpool and queue some jobs along with a small packet of data for them to work on?
Peter,
No, not really. Because you would have to submit each independent job.
This way, you get to batch them, which is REALLY important from the point of view of performance.
Well I'm no threading expert, but from what I recall of reading my threading book a while back the threadpool class is very well written and takes lots of things into account when balancing the load on your machine (based on number of processors, how long a job is taking to complete, etc) and is a preferable approach rather than waking up a specific number of threads at the same time.
Have you tried the ThreadPool and compared the performance? If so, how did it compare?
Peter,
The problem isn't the thread pool.
Let me see if I can explain.
We have:
task 1 - index doc a
task 2 - index doc b
If we run them via the thread pool, we would have to threat each of them as a separate operation
But if we run them via my code, we can batch both operations together. What this means is that our IO cost will go WAY down.
I think it's the word "batch" you are losing me with. By that do you mean that the jobs are performed in parallel or serial?
Peter,
By batch I mean that both doc A and doc B will be indexed at the same time, and will be written to disk on the same IO operation.
The difference is whatever you have 1 IO operation per batch or multiple.
By following this method, I can aggregate multiple changes and write them at a single action, not once per work item
So multiple threads perform the updates in memory, and then when they are all completed the result is written to disk by a class that was waiting for the multiple threads to finish?
Peter,
In memory computation is CHEAP.
The synchronization overhead on that is too high, just do it in a single thread.
The actual costs are in the IO, and they overshadow everything else.
This allows multiple workers to capture the waiting data and process that. Each of them does something else with it (actually, each of them is a separate index), but the idea is that we can capture multiple changes at one go, reducing IO costs.
But do these individual threads write to disk, or does a single thread at the end perform a single write operation with all of the results?
Each thread write to its own directory
Okay, and how do you get multiple threads performing their own write operation to perform their IO as a single action?
Peter,
I don't do that. But that saves me each separate thread having to perform the IO op on the set of work that it has.
By that it sounds like you are saying that the thread doesn't have to "own the code" of doing the operation, is that correct? Are you saying that your thread code calls a method on some other class which performs the IO?
Peter,
Yes, the code to actually do the writes is in Lucene.NET
I think this takes us back to where we started :-)
You are resuming multiple threads which all perform independent writes to disk, why would that be quicker than using the threadpool?
Threadpool code would be quicker to write, simpler to read, and I suspect it will probably execute faster too. Have you compared the performance?
Peter,
Assume the following:
Doc A is written
Doc B is written
Each is an independent operation.
We have two separate indexes.
Using your approach, we will have 4 separate writes.
Using mine, we will have 2.
That's sounds like an implementation issue not a threading one.
You wouldn't start a thread from the pool for every document in a batch, you'd queue one job with the batch of documents as the data packet it needs to work on.
Peter,
That would require me to handle multiple levels of locking (one for each thread), would force me to remember all the pending state, etc.
In this fashion, the only thing I need to remember is the last handled item.
You'd only need to make the target class thread safe (the class which holds the index information, if that is what we are talking about?)
If your calling code is aware of all of the documents which need updating then you can pass them to the thread pool as a batch of jobs.
pseudo code...
foreach(Index in indexes)
{
post a job to the thread pool for (index, listOfDocuments)
}
If you have 32 jobs to do per document and you only have 4 CPU cores you are going to wake up lots of threads which cannot all execute concurrently and that will slow down your performance.
It's just that it looks like a rethink of how you call this routine could not only make it more simple to develop (and understand) the code but improve the performance too.
Peter,
Making the class thread safe would means that I have locking, which I want to avoid.
And the calling code doesn't handle a list of documents, it handle only one doc at a time.
And yes, I'll wake up a lot of threads, but that is cheap to do, since they will wake up, do some minute amount of work, then block on IO, anyway.
Reducing the number of IO calls far outstrips any thread scheduling concerns that I can have.
You already have locking, you'll just be moving it elsewhere.
If the calling code only handles one document at a time then there is somewhere where it is added to a list of documents otherwise your own code would only be dealing with one document at a time too.
"Reducing the number of UI calls far outstrips any thread scheduling concerns that I can have"
It's a false dichotomy, it is possible to write code which both schedules threads properly and has the same number of IO calls; you don't have to choose between the two.
Peter,
I'll put up a post that shows what is actually going on in more details, that would explain that.
Comment preview