Ayende @ Rahien

It's a girl

A minimal actor framework, part 2

Originally posted at 4/29/2011

Yesterday I introduced a very minimal actor “framework”, and I noted that while it was very simple, it wasn’t a very good one. The major problems in that implementation are:

  • No considerations for errors
  • No considerations for async operations

The first one seems obvious, but what about the second one, how can we not consider async operations in an actor framework?

Well, the answer to that is quite simple, our actor framework assumed that we were always going to execute synchronously. That isn’t going to work if there is a need to do things like async IO.

As it happened, that is precisely what I had in mind for this code, so I wrote this:

public class Actor<TState>
{
    public TState State { get; set; }

    private readonly ConcurrentQueue<Func<TState, Task>> actions = new ConcurrentQueue<Func<TState, Task>>();
    private Task activeTask;

    public void Act(Func<TState, Task> action)
    {
        actions.Enqueue(action);

        lock(this)
        {
            if (activeTask != null) 
                return;
            activeTask = Task.Factory.StartNew(ExecuteActions);
        }
    }

    public event EventHandler<UnhandledExceptionEventArgs> OnError;

    private void InvokeOnError(UnhandledExceptionEventArgs e)
    {
        var handler = OnError;
        if (handler == null) 
            throw new InvalidOperationException("An error was raised for an actor with no error handling capabilities");
        handler(this, e);
    }

    private void ExecuteActions()
    {
        Func<TState, Task> func;
        if (actions.TryDequeue(out func))
        {
            func(State)
                .ContinueWith(x =>
                {
                    if (x.Exception != null)
                    {
                        InvokeOnError(new UnhandledExceptionEventArgs(x.Exception, false));
                        return;
                    }
                    ExecuteActions();
                });
            return;
        }
        lock(this)
        {
            activeTask = null;
        }
    }
}

Thoughts?d

Tags:

Posted By: Ayende Rahien

Published at

Originally posted at

Comments

tobi
05/05/2011 09:12 AM by
tobi

The recursive task invocation is a nice trick. I like the Task-factory solution very much.

It might be a problem that, if the actor faults, the queue will fill unboundedly. Setting a fault-flag or nulling the actions field would solve this.

Dave
05/05/2011 10:02 AM by
Dave

Euh, when you add to time consuming tasks to the actor, it seems that the second task is never assign to activeTask as only Add is setting the field.

Shoudn't the ContinueWith clause also set the property? Because after the first run of ExecuteActions, ActiveTask is set to null and the recursive ExecuteActions doesn't set this property. So when you add a task, while the first task is already finished (and activeTask is null), the Add method will invoke a parallel run as it seems..

Damien
05/05/2011 10:43 AM by
Damien

Isn't there a race here?

ExecuteActions detects that there are no pending actions

Act adds a new action

Act enters the lock, observes non-null activeTask

Act exits

ExecuteActions enters the lock, sets activeTask to null

the action will sit waiting until another call to Act occurs.

x4m
05/05/2011 11:53 AM by
x4m

I suppose Damien is right. And previous version had better performance due to double checked locking.

Rob
05/05/2011 11:59 AM by
Rob

I would be nice to have the InvalidOperationException include the original exception as an inner exception.

Ryan Heath
05/05/2011 01:53 PM by
Ryan Heath

Shouldn't the next action be called when you can handle the exception through OnError event? the return in ContinueWith should be removed?

And the race condition Damien talks about,

is solved with

lock(this)

{

if (actions.Any())

{

ExecuteActions();

}

else

{

 activeTask = null;

}

}

When the locked is acquired we should not reset activeTask when there is more work available.

// Ryan

Ryan Heath
05/05/2011 01:55 PM by
Ryan Heath

Oops, the lock should be release when calling ExecuteActions again ...

me ducks ...

// Ryan

Ryan Heath
05/05/2011 01:59 PM by
Ryan Heath

quick fix ...

var callExecuteActions = false;

lock(this)

{

if (actions.Any())

{

callExecuteActions = true;

}

else

{

activeTask = null;

}

}

if (callExecuteActions)

ExecuteActions();

I probably want to refactor the whole method with a while loop now ...

// Ryan

Ayende Rahien
05/05/2011 02:29 PM by
Ayende Rahien

Dave,

We continue executing on the same task (or its continuation, note the ExecuteActions call there.

Andrei Volkov
05/05/2011 05:12 PM by
Andrei Volkov

Yes, there's a race condition here. Instead of solving it, I would suggest investing 5 seconds in an explicit lock root object. This will force you to give it a name. This in its turn will force you to think what shared resource are your really protecting with the lock. That resource is the worker thread (aka activeTask). What you're trying to do is make sure that exactly one worker thread is auto-started and auto-stopped when needed. I believe this is a wrong problem to solve. Instead you should start the thread in constructor and dispose it in the IDisposable.Dispose (and in the finalizer). Then you should make sure your task is not spinning when there is no work.

Patrick Huizinga
05/06/2011 08:16 AM by
Patrick Huizinga

When the action / func (please make up your mind) itself throws an exception, the actor will grind to a halt. activeTask will catch the exception and save it for you, but you never try to access it.

Also, in ExecuteActions I would either invert the if, or at least use an else block for the lock statement, in which case you could also remove the return. On my first pass over the code I thought you were calling ExecuteActions recursively and setting activeTask to null.

Harry Steinhilber
05/06/2011 07:04 PM by
Harry Steinhilber

@Patrick,

Isn't any exception saved by activeTask being accessed via the continuation:

.ContinueWith(x =>

            {

                if (x.Exception != null)  // <--- isn't x is the current Task

                {

                    InvokeOnError(new UnhandledExceptionEventArgs(x.Exception, false));

                    return;

                }
Patrick Huizinga
05/09/2011 08:02 AM by
Patrick Huizinga

@Harry

There you continue on the task that gets returned by the func. But what happens if the func itself throws an exception?

Will Smith
05/09/2011 08:22 PM by
Will Smith

Using the RX Framework (Subject class) as the basis for this is a way to get started. I have done this before. Just a suggestion.

Harry Steinhilber
05/09/2011 09:53 PM by
Harry Steinhilber

@Patrick,

I think I understand now, but correct me if I'm wrong. If it is currently executing a recursive call to ExecuteActions and the func throws, the exception is stored in the previous func's task and its continuation will handle it. But if it is executing the first call to ExecuteActions that was fired by Task.Factory.CreateNew and the func throws, the exception is saved by the activeTask and it is never handled, just silently ignored. Correct?

Patrick Huizinga
05/10/2011 08:38 AM by
Patrick Huizinga

Well, ContinueWith returns a new task, so on recursive calls this ignored task will contain the exception. In that case when it is GCed, the task will scream bloody murder about no one handling the exception and crash the application.

In the non-recursive case, not only will the exception be silently ignored, which is 'frowned upon', its existence will actually prevent the actor from ever dequeuing the next func ever again.

Sam
05/12/2011 04:18 PM by
Sam

What about taking it one step further and doing away with the Queue altogether? If you track the last task added, you can add a continuation to it directly effectively creating a linked list of asynchronous tasks.

Also, what happens if a task blocks and never returns? It's kind of a problem with any actor, but you may want to consider a timeout and task cancellation.

Ayende Rahien
05/13/2011 07:13 AM by
Ayende Rahien

Sam, I really wanted to do that, but the problem is, how do you do it with concurrent task addition without allowing concurrent tasks execution.

Shane
05/13/2011 02:56 PM by
Shane

I'm going to reiterate Anthony's comment in the Part 1 post. Retlang could fit your need. It has matured significantly since your initial review of it several years ago. http://code.google.com/p/retlang/ from the site:

"The library is intended for use in message based concurrency similar to event based actors in Scala.

Retlang relies upon four abstractions: IFiber, IQueue, IExecutor, and IChannel. An IFiber is an abstraction for the context of execution (in most cases a thread). An IQueue is an abstraction for the data structure that holds the actions until the IFiber is ready for more actions. The default implementation, DefaultQueue, is an unbounded storage that uses standard locking to notify when it has actions to execute. An IExecutor performs the actual execution. It is useful as an injection point to achieve fault tolerance, performance profiling, etc. The default implementation, DefaultExecutor, simply executes actions. An IChannel is an abstraction for the conduit through which two or more IFibers communicate (pass messages).

All messages to a particular IFiber are delivered sequentially. Components can easily keep state without synchronizing data access or worrying about thread races."

Using a custom IExecutor with Retlang may solve your issue of "concurrent task addition without concurrent tasks execution"

just my 2 cents...

Sam
05/13/2011 04:33 PM by
Sam

This is what I was thinking. Let me know if I've missed anything.

class MyActor1<TState>
{
    public TState State { get; set; }

    private Task tailTask = null;

    public void Act(Func<TState, Task> action)
    {
        lock (this)
        {
            if (tailTask == null)
            {
                tailTask = ExecuteAction(action);
            }
            else
            {
                tailTask = tailTask.ContinueWith(_ => ExecuteAction(action)).Unwrap();
            }
        }
    }

    public event EventHandler<UnhandledExceptionEventArgs> OnError;

    private void InvokeOnError(Task t)
    {
        if (t.Exception != null)
        {
            var handler = OnError;
            if (handler == null)
                throw new InvalidOperationException("An error was raised for an actor with no error handling capabilities");
            handler(this, new UnhandledExceptionEventArgs(t.Exception, false));
        }
    }

    private Task ExecuteAction(Func<TState, Task> action)
    {
        Task actionTask = action(State);
        return actionTask.ContinueWith(InvokeOnError);
    }
}
Shane
05/13/2011 04:34 PM by
Shane

An example using Retlang:

        //setup execution context (thread pool)
        using (var fiber = new PoolFiber())
        {
            fiber.Start();

            //setup channel to get I/O messages
            var ioChannel = new Channel<string>();

            //setup subscriber and an action to take.
            ioChannel.Subscribe(fiber, messagetosend =>
                                           {
                                               //io.send(messagetosend)
                                           });

            //these calls would be executed syncronously on another thread (fiber).
            ioChannel.Publish("abc");
            ioChannel.Publish("def");

            //this could be done somewhere else, at any time...
            //ioChannel.Publish("doesn't matter where executed")
            //it wouldn't affect the execution order.  "def" would still be executed after "abc".
        }
Comments have been closed on this topic.