Ayende @ Rahien

Hi!
My name is Ayende Rahien
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

@

Posts: 5,947 | Comments: 44,540

filter by tags archive

A minimal actor framework


Originally posted at 4/29/2011

For one of our projects, we need the ability to asynchronously push changes through a socket, since we actually care about the order of actions, we realized that we couldn’t really use purely async IO. For example, consider the following actions:

connection.Send(“abc”);
connection.Send(“def”);

I care that abc will be sent before def, and I care that all of abc will be sent before anything else is sent through that connection. What I don’t care about is whatever I have anything else sent between abc and def.

All of that can be had using:

public class Actor<TState>
{
    public TState State { get; set; }

    private readonly ConcurrentQueue<Action<TState>> actions = new ConcurrentQueue<Action<TState>>();
    private Task activeTask;

    public void Act(Action<TState> action)
    {
        actions.Enqueue(action);

        if (activeTask != null) 
            return;

        lock(this)
        {
            if (activeTask != null) 
                return;
            activeTask = Task.Factory.StartNew(ExecuteActions);
        }
    }

    private void ExecuteActions()
    {
        Action<TState> action;
        while (actions.TryDequeue(out action))
        {
            action(State);
        }
        lock(this)
        {
            activeTask = null;
        }
    }
}

The actions will execute synchronously for each actor, and it satisfy my requirement for how to deal with this quite nicely, even if I say so myself Smile

In truth, the code above isn’t really good. Can you consider ways to improve this?


Comments

tobi

It has a bug. The task could be just before "activeTask = null;" and the Act method could still exit due to "if (activeTask != null)" being true. A new task would not be started.

Idsa

lock (this) is not a good idea

Ayende Rahien

Tobi,

Great that you spotted that!

What else? Look for design issues.

Patrick Huizinga

State shouldn't be public. It should only be editable from the activeTask thread.

tobi

The queue is unbounded.

Scooletz

How about having one ConcurrentuQueue per connection, wrapped in write-only-interface, being passed as a Actor <tstate parameter. This meets anty-requirement which allows to interrupt one actor messages with other actors'. If so, the thread would be started in the same manner (one per a concurrent queue), but it would be more explicit about having it all done on one connection., one queue.

Sam Leitch

Have you taken a look at the ConcurrentExclusiveSchedulerPair included in TPL Dataflow? It provides the same synchronization of tasks but doesn't break the task/continuation pattern.

Just schedule everything on the ExclusiveScheduler and you're good to go.

Ayende Rahien

Sam,

That means that everything in sync, but I want parallelism between different actors.

Sam Leitch

You would need a SchedulerPair per "actor". Each ExclusiveScheduler can run in parallel (at least I hope so). I see it as a non-blocking form of reader/writer lock.

It doesn't ensure in-order processing though. Only that each task will run on its own.

Sam Leitch

By synchronous, so you mean each action has to execute in the same thread? If that's the case then...well... there are problems. StartNew is not guaranteed to use the same thread twice.

Mayrc Mellow

I see a problem: if I call Act method and activeTask object isn't null, the task that I'm adding only will execute on next time that I call Act method again. Is this a problem?

Colin

Have you considered wrapping the MailboxProcessor<'a> from the FSharp Control primitives in a C# friendly type with the API you wish to define. I have done this in the project I am currently working on and it works well, although it puts a dependency on Fsharp.Core something you may wish to avoid

Oğuzhan Eren

"actions.Enqueue(action);" process should be separete Act function so I will be add action to Queue any time (working actor)

Rafal

You have no error handling

Rabo

I'd improve it by writing it in F# as a MailboxProcessor. I guess Colin already mentioned this, but it would require hardly any code and would be perfectly suited to such a task.

x4m
x4m

One action can hold not runned, because you acquire lock after testing queue is empty in ExecuteActions()

x4m
x4m

Thread1 is doing

private void ExecuteActions()

{

    Action

<tstate action;

    while (actions.TryDequeue(out action))

    {

        action(State);

    }

Then Thread 2 Is Doing

public void Act(Action <tstate action)

{

    actions.Enqueue(action);


    if (activeTask != null) 

        return;

Then Thread 1 leaves one action in queue

    lock(this)

    {

        activeTask = null;

    }

I'd write something like this

private void ExecuteActions()

{

while(activeTask)

{

    Action

<tstate action;

    while (actions.TryDequeue(out action))

    {

        action(State);

    }

    lock(this)

    {

if(!queue.Empty)

        activeTask = null;

    }

}

}
Michael Reichenauer

A problem with using Task.Factory.StartNew(action) is that a pool thread is blocked while an action running. For a single Actor in a program, this is not a big problem (only one thread from the pool), but if there would be multiple Actors running concurrently, it could become a real problem, if the actions might block for some time.

A different approach (but a bit more complicated and restricted) would be to let the Actor handle the sequential scheduling of provided async operations:

Act(Func <tstate,> action)

I.e. the caller queues delegates, which returns a Task. The Act function just ensures that each scheduled Task continues with the next. It is not possible to queue Task directly (skipping the delegate), since that would force callers to Start task at the time of the call to Act(). The delegate allows Act() to trigger the async operation when the previous Task is done.

The caller can implement efficient async operations, which do not block threads, e.g. by using Task.FromAsync() on BeginX/EndX functions and use the Actor to have them run in sequence. The caller can still queue inefficient blocking tasks when only sync operations are available.

Michael Reichenauer

Skip/delete my previous comment, I failed to notice that this was already covered in your next post

Ayende Rahien

Michael,

Take a look at the next post

Johannes Hansen

Hi Ayende, everytime you've posted a message lately my google reader has marked ALL your previous posts as new posts as well. There might be something wrong with your blog engine. Just letting you know. :)

Ayende Rahien

Johannes, Yes, we didn't have guids for posts, so it did that. Fixed now

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. RavenDB Sharding (2):
    21 May 2015 - Adding a new shard to an existing cluster, the easy way
  2. The RavenDB Comic Strip (2):
    20 May 2015 - Part II – a team in trouble!
  3. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  4. Interview question (2):
    30 Mar 2015 - fix the index
  5. Excerpts from the RavenDB Performance team report (20):
    20 Feb 2015 - Optimizing Compare – The circle of life (a post-mortem)
View all series

RECENT COMMENTS

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats