Ayende @ Rahien

It's a girl

A minimal actor framework

Originally posted at 4/29/2011

For one of our projects, we need the ability to asynchronously push changes through a socket, since we actually care about the order of actions, we realized that we couldn’t really use purely async IO. For example, consider the following actions:

connection.Send(“abc”);
connection.Send(“def”);

I care that abc will be sent before def, and I care that all of abc will be sent before anything else is sent through that connection. What I don’t care about is whatever I have anything else sent between abc and def.

All of that can be had using:

public class Actor<TState>
{
    public TState State { get; set; }

    private readonly ConcurrentQueue<Action<TState>> actions = new ConcurrentQueue<Action<TState>>();
    private Task activeTask;

    public void Act(Action<TState> action)
    {
        actions.Enqueue(action);

        if (activeTask != null) 
            return;

        lock(this)
        {
            if (activeTask != null) 
                return;
            activeTask = Task.Factory.StartNew(ExecuteActions);
        }
    }

    private void ExecuteActions()
    {
        Action<TState> action;
        while (actions.TryDequeue(out action))
        {
            action(State);
        }
        lock(this)
        {
            activeTask = null;
        }
    }
}

The actions will execute synchronously for each actor, and it satisfy my requirement for how to deal with this quite nicely, even if I say so myself Smile

In truth, the code above isn’t really good. Can you consider ways to improve this?

Tags:

Posted By: Ayende Rahien

Published at

Originally posted at

Comments

tobi
05/04/2011 09:15 AM by
tobi

It has a bug. The task could be just before "activeTask = null;" and the Act method could still exit due to "if (activeTask != null)" being true. A new task would not be started.

Idsa
05/04/2011 09:18 AM by
Idsa

lock (this) is not a good idea

Ayende Rahien
05/04/2011 09:22 AM by
Ayende Rahien

Tobi,

Great that you spotted that!

What else? Look for design issues.

Patrick Huizinga
05/04/2011 09:27 AM by
Patrick Huizinga

State shouldn't be public. It should only be editable from the activeTask thread.

tobi
05/04/2011 09:47 AM by
tobi

The queue is unbounded.

Scooletz
05/04/2011 10:03 AM by
Scooletz

How about having one ConcurrentuQueue per connection, wrapped in write-only-interface, being passed as a Actor <tstate parameter. This meets anty-requirement which allows to interrupt one actor messages with other actors'. If so, the thread would be started in the same manner (one per a concurrent queue), but it would be more explicit about having it all done on one connection., one queue.

Sam Leitch
05/04/2011 10:14 AM by
Sam Leitch

Have you taken a look at the ConcurrentExclusiveSchedulerPair included in TPL Dataflow? It provides the same synchronization of tasks but doesn't break the task/continuation pattern.

Just schedule everything on the ExclusiveScheduler and you're good to go.

Ayende Rahien
05/04/2011 10:18 AM by
Ayende Rahien

Sam,

That means that everything in sync, but I want parallelism between different actors.

Sam Leitch
05/04/2011 10:24 AM by
Sam Leitch

You would need a SchedulerPair per "actor". Each ExclusiveScheduler can run in parallel (at least I hope so). I see it as a non-blocking form of reader/writer lock.

It doesn't ensure in-order processing though. Only that each task will run on its own.

Sam Leitch
05/04/2011 10:34 AM by
Sam Leitch

By synchronous, so you mean each action has to execute in the same thread? If that's the case then...well... there are problems. StartNew is not guaranteed to use the same thread twice.

Ayende Rahien
05/04/2011 10:42 AM by
Ayende Rahien

Sam,

No, one after another

Mayrc Mellow
05/04/2011 12:03 PM by
Mayrc Mellow

I see a problem: if I call Act method and activeTask object isn't null, the task that I'm adding only will execute on next time that I call Act method again. Is this a problem?

Colin
05/04/2011 12:23 PM by
Colin

Have you considered wrapping the MailboxProcessor<'a> from the FSharp Control primitives in a C# friendly type with the API you wish to define. I have done this in the project I am currently working on and it works well, although it puts a dependency on Fsharp.Core something you may wish to avoid

Oğuzhan Eren
05/04/2011 12:28 PM by
Oğuzhan Eren

"actions.Enqueue(action);" process should be separete Act function so I will be add action to Queue any time (working actor)

Rafal
05/04/2011 12:56 PM by
Rafal

You have no error handling

Rabo
05/04/2011 03:47 PM by
Rabo

I'd improve it by writing it in F# as a MailboxProcessor. I guess Colin already mentioned this, but it would require hardly any code and would be perfectly suited to such a task.

x4m
05/05/2011 08:31 AM by
x4m

One action can hold not runned, because you acquire lock after testing queue is empty in ExecuteActions()

x4m
05/05/2011 08:42 AM by
x4m

Thread1 is doing

private void ExecuteActions()

{

    Action

<tstate action;

    while (actions.TryDequeue(out action))

    {

        action(State);

    }

Then Thread 2 Is Doing

public void Act(Action <tstate action)

{

    actions.Enqueue(action);


    if (activeTask != null) 

        return;

Then Thread 1 leaves one action in queue

    lock(this)

    {

        activeTask = null;

    }

I'd write something like this

private void ExecuteActions()

{

while(activeTask)

{

    Action

<tstate action;

    while (actions.TryDequeue(out action))

    {

        action(State);

    }

    lock(this)

    {

if(!queue.Empty)

        activeTask = null;

    }

}

}
Michael Reichenauer
05/05/2011 09:44 AM by
Michael Reichenauer

A problem with using Task.Factory.StartNew(action) is that a pool thread is blocked while an action running. For a single Actor in a program, this is not a big problem (only one thread from the pool), but if there would be multiple Actors running concurrently, it could become a real problem, if the actions might block for some time.

A different approach (but a bit more complicated and restricted) would be to let the Actor handle the sequential scheduling of provided async operations:

Act(Func <tstate,> action)

I.e. the caller queues delegates, which returns a Task. The Act function just ensures that each scheduled Task continues with the next. It is not possible to queue Task directly (skipping the delegate), since that would force callers to Start task at the time of the call to Act(). The delegate allows Act() to trigger the async operation when the previous Task is done.

The caller can implement efficient async operations, which do not block threads, e.g. by using Task.FromAsync() on BeginX/EndX functions and use the Actor to have them run in sequence. The caller can still queue inefficient blocking tasks when only sync operations are available.

Michael Reichenauer
05/05/2011 11:07 AM by
Michael Reichenauer

Skip/delete my previous comment, I failed to notice that this was already covered in your next post

Ayende Rahien
05/05/2011 02:12 PM by
Ayende Rahien

Michael,

Take a look at the next post

Johannes Hansen
05/13/2011 07:52 AM by
Johannes Hansen

Hi Ayende, everytime you've posted a message lately my google reader has marked ALL your previous posts as new posts as well. There might be something wrong with your blog engine. Just letting you know. :)

Ayende Rahien
05/13/2011 07:54 AM by
Ayende Rahien

Johannes, Yes, we didn't have guids for posts, so it did that. Fixed now

Comments have been closed on this topic.