A minimal actor framework
Originally posted at 4/29/2011
For one of our projects, we need the ability to asynchronously push changes through a socket, since we actually care about the order of actions, we realized that we couldn’t really use purely async IO. For example, consider the following actions:
connection.Send(“abc”);
connection.Send(“def”);
I care that abc will be sent before def, and I care that all of abc will be sent before anything else is sent through that connection. What I don’t care about is whatever I have anything else sent between abc and def.
All of that can be had using:
public class Actor<TState> { public TState State { get; set; } private readonly ConcurrentQueue<Action<TState>> actions = new ConcurrentQueue<Action<TState>>(); private Task activeTask; public void Act(Action<TState> action) { actions.Enqueue(action); if (activeTask != null) return; lock(this) { if (activeTask != null) return; activeTask = Task.Factory.StartNew(ExecuteActions); } } private void ExecuteActions() { Action<TState> action; while (actions.TryDequeue(out action)) { action(State); } lock(this) { activeTask = null; } } }
The actions will execute synchronously for each actor, and it satisfy my requirement for how to deal with this quite nicely, even if I say so myself
In truth, the code above isn’t really good. Can you consider ways to improve this?
Comments
It has a bug. The task could be just before "activeTask = null;" and the Act method could still exit due to "if (activeTask != null)" being true. A new task would not be started.
lock (this) is not a good idea
Tobi,
Great that you spotted that!
What else? Look for design issues.
State shouldn't be public. It should only be editable from the activeTask thread.
The queue is unbounded.
How about having one ConcurrentuQueue per connection, wrapped in write-only-interface, being passed as a Actor <tstate parameter. This meets anty-requirement which allows to interrupt one actor messages with other actors'. If so, the thread would be started in the same manner (one per a concurrent queue), but it would be more explicit about having it all done on one connection., one queue.
Have you taken a look at the ConcurrentExclusiveSchedulerPair included in TPL Dataflow? It provides the same synchronization of tasks but doesn't break the task/continuation pattern.
Just schedule everything on the ExclusiveScheduler and you're good to go.
Sam,
That means that everything in sync, but I want parallelism between different actors.
You would need a SchedulerPair per "actor". Each ExclusiveScheduler can run in parallel (at least I hope so). I see it as a non-blocking form of reader/writer lock.
It doesn't ensure in-order processing though. Only that each task will run on its own.
By synchronous, so you mean each action has to execute in the same thread? If that's the case then...well... there are problems. StartNew is not guaranteed to use the same thread twice.
Sam,
No, one after another
I see a problem: if I call Act method and activeTask object isn't null, the task that I'm adding only will execute on next time that I call Act method again. Is this a problem?
Have you considered wrapping the MailboxProcessor<'a> from the FSharp Control primitives in a C# friendly type with the API you wish to define. I have done this in the project I am currently working on and it works well, although it puts a dependency on Fsharp.Core something you may wish to avoid
"actions.Enqueue(action);" process should be separete Act function so I will be add action to Queue any time (working actor)
You have no error handling
I'd improve it by writing it in F# as a MailboxProcessor. I guess Colin already mentioned this, but it would require hardly any code and would be perfectly suited to such a task.
Retlang is a great, simple framework to handle this kind of thing:
http://code.google.com/p/retlang/
One action can hold not runned, because you acquire lock after testing queue is empty in ExecuteActions()
Thread1 is doing
private void ExecuteActions()
<tstate action;
Then Thread 2 Is Doing
public void Act(Action <tstate action)
Then Thread 1 leaves one action in queue
I'd write something like this
private void ExecuteActions()
while(activeTask)
{
<tstate action;
if(!queue.Empty)
}
A problem with using Task.Factory.StartNew(action) is that a pool thread is blocked while an action running. For a single Actor in a program, this is not a big problem (only one thread from the pool), but if there would be multiple Actors running concurrently, it could become a real problem, if the actions might block for some time.
A different approach (but a bit more complicated and restricted) would be to let the Actor handle the sequential scheduling of provided async operations:
Act(Func <tstate,> action)
I.e. the caller queues delegates, which returns a Task. The Act function just ensures that each scheduled Task continues with the next. It is not possible to queue Task directly (skipping the delegate), since that would force callers to Start task at the time of the call to Act(). The delegate allows Act() to trigger the async operation when the previous Task is done.
The caller can implement efficient async operations, which do not block threads, e.g. by using Task.FromAsync() on BeginX/EndX functions and use the Actor to have them run in sequence. The caller can still queue inefficient blocking tasks when only sync operations are available.
Skip/delete my previous comment, I failed to notice that this was already covered in your next post
Michael,
Take a look at the next post
Hi Ayende, everytime you've posted a message lately my google reader has marked ALL your previous posts as new posts as well. There might be something wrong with your blog engine. Just letting you know. :)
Johannes, Yes, we didn't have guids for posts, so it did that. Fixed now
Comment preview