Ayende @ Rahien

It's a girl

Rhino ETL 2.0

Rhino ETL was born out of a need. I need to do a lot of ETL type operations. Those include anything from moving data from legacy databases to my database, importing files, importing data over web services, etc. For a while, I have used SSIS for those needs. It has proven... inadequate. Mostly in terms of ease of development, deployment, error handling, etc.

This is my third attempt at building an ETL tool. The third time is much shorter and clearer than both previous attempts.

The goals for the project were:

  • Developer friendly:
    • Errors
    • Development
    • Deployment
  • Performant - I need to move large amounts of data around, and I need to do it fast.
  • Easy - The hard part should be handling the required transforms, dealing with the logic for in/out, etc.
  • Unlimited - You should not be limited to what the tool gives you, and you should be able to integrate easily with other tools
  • Language agnostic - You should be able to develop solutions for this using C#/VB/Boo
  • DSL - Provide a DSL to make it even easier to work with

The overall concept is based around these two classes, and the idea of a pipeline:

image image

Here is a simple operation, which just generate all the even numbers to a million:

public class EvenNumberToMillion : AbstractOperation
{
	public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
	{
		for(int i = 2; i< 1000000; i += 2)
		{
			Row row = new Row();
			row["number"] = i;
			yield return row;
		}
	}
}

This is an input operation, it ignores its rows parameter, and yields rows generated by some other way. As you can see, we use the yield rows for each iteration.

We combine operations into a pipeline using the process. If we wanted to print the numbers, we would have build the following pipeline process:

public class PrintNumbersProcess : EtlProcess
{
	public override void Initialize()
	{
		Register(new EvenNumberToMillion());
		Register(new PrintNumbers());
	}
}

All the output of the first operation goes into the second operation, and so on and so forth. Using the DSL it will look like:

operation EvenNumbersToMillion:
	for i in range(1000000,2):
		yield Row(Number: i)

operation PrintNumbers:
	for row in rows:
		print row.Number

process PrintNumbersProcess:
	EvenNumbersToMillion()
	PrintNumbers()

This is just to demonstrate the concept of the pipeline. Now we can get into the interesting operations. As you already surmised, AbstractOperation is the common base class, and you can inherit it to produce rows from any source.

Inputs

Rhino ETL offers special support for getting data from a database. It means that you can define it simple as:

public class ReadUsers : ConventionInputCommandOperation
{
    public ReadUsers() : base("test")
    {
        Command = "SELECT Id, Name,Email FROM Users";
    }
}

Or, using DSL:

input "test", Command = "SELECT id, name, email  FROM Users"

Note the "test" here, it is the name of the connection string in the app.config.

Outputs

On the output side, we have more interesting options. We can use any custom option that we want, of course, but for working with databases, we have the following options:

Standard DB commands:

public class FibonacciOutput : ConventionOutputCommandOperation
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

You'll note that I am not specifying the parameters, those are taken implicitly from the current row.

Using DSL:

output "test", Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)"

SQL Batch operations:

public class FibonacciOutput : ConventionSqlBatchOpeartion
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

Using DSL: Haven't written that yet :-(

Sql Bulk Insert:

 public class FibonacciBulkInsert : SqlBulkInsertOperation
    {
        public FibonacciBulkInsert() : base("test", "Fibonacci")
        {
        }

        protected override void PrepareSchema()
        {
            Schema["id"] = typeof (int);
        }
    }

Using DSL: Haven't written that yet :-(

Files

For working with files, we have the support of the excellent FileHelpers library, which makes working with files really easy.

Reading from a file is simply:

public class UserRecord
{
    public string email;
    public int id;
    public string name;
}

public class ReadUsersFromFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using(FileEngine file = FluentFile.For<UserRecord>().From("users.txt"))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

There is 1:1 translation to Boo here, so I'll spare you that.

Writing is very similar:

public class WriteUsersToFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        FluentFile engine = FluentFile.For<UserRecord>();
        engine.HeaderText = "Id\tName\tEmail";
        using(FileEngine file = engine.To("users.txt"))
        {
            foreach (Row row in rows)
            {
                UserRecord record = new UserRecord();
                
                record.Id = (int)row["id"];
                record.Name = (string)row["name"];
                record.Email = (string)row["email"];

                file.Write(record);
            }
        }
        yield break;
    }
}

Joins

Joins are an interesting concept, and I play with them quite extensively recently. Joins in Rhino ETL are implemented as sub processes. Hash joins are very simple:

public class JoinUsersAndIds : JoinOperation
{
	protected override Row MergeRows(Row leftRow, Row rightRow)
	{
		Row row = leftRow.Clone();
		row["user_id"] = rightRow["new_id"];
		return row;
	}

	protected override void SetupJoinConditions()
	{
		InnerJoin
			.Left("id")
			.Right("old_id");
	}
}

This is just the operation, you hook it up in the process using:

Register(new JoinUsersAndIds()
         	.Left(new GenerateUsers(25000))
         	.Right(new GenerateRandomIds(15000)));

Each side is capable of accepting a full blown sub process on its own.

Nested loops joins are appropriate for the more complex cases:

public class FullJoinUsersToPeopleByEmail : NestedLoopsJoinOperation
{
    protected override bool MatchJoinCondition(Row leftRow, Row rightRow)
    {
        return LeftJoin(leftRow["email"], rightRow["email"]);
    }

	protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = new Row();
        row.Copy(leftRow);
        row["person_id"] = rightRow["id"];
        return row;
    }
}

Using DSL it looks like this:

join get_user_roles:
	left:
		input "test", Command = "SELECT id, name, email  FROM Users"
		
	right:
		nput "test", Command = "SELECT userid, roleid FROM UsersToRoles"
			
	on left.id ==  right.userid:
		row.Name = left.Name
		row.Role = right.RoleId

(A note about this syntax, this currently generate a nested loop join, I intend to make it generate an optimize version soon).

Branching

That was quite a challenge to implement, I kept missing a key point, and that tripped me for a while. Here is how send a row to several sources:

BranchingOperation split = new BranchingOperation()
	.Add(Partial
		.Register(new MultiplyByThreeOperation())
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()))
	.Add(Partial
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()));
Register(split);

(Note, the implementation is not as optimized as it could be.)

Aggregates

Well, you got to have those, don't you? Here is a simple Row Count aggregate:

public class RowCount : AbstractAggregationOperation
{
    	protected override void Accumulate(Row row, Row aggregate)
      {
            if (aggregate["count"] == null)
                aggregate["count"] = 0;

            int count = (int)aggregate["count"];
            aggregate["count"] = count + 1;
      }
}

We are called once per row, and can accumulate all the values that we want. We can use grouping to create more interesting results:

public class CostPerProductAggregation : AbstractAggregationOperation
{
    protected override void Accumulate(Row row, Row aggregate)
    {
        aggregate["name"] = row["name"];
        if(aggregate["cost"]==null)
            aggregate["cost"] = 0;
        aggregate["cost"] = ((int) aggregate["cost"]) + ((int) row["price"]);
    }

    protected override string[] GetColumnsToGroupBy()
    {
        return new string[] {"name"};
    }
}

We can also override the FinishAggregation(Row) method to complete any calculations when all the rows are completed. Rhino ETL guarantees that we will get the same aggregate row for all the rows that match the same columns, so that is taken care of.

Using DSL for that is simply:

aggregate join_product_names:
	accumulate:
		aggregate.names = [] if aggregate.names is null
		aggregate.names.Add(row.name)
	
	terminate:
		aggregate.result = string.Join(", ", aggregate.names.ToArray(string))

That is about it for now, I think. You can get the source for Rhino ETL here:

https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/rhino-etl

I plan to have a binary release one I am confident that all the notes in this post are no longer relevant.

SSIS vs. CSV

Another team have run into this bug, what this basically means is that SSIS is not capable of importing CSV files. The "spec" for CSV files is about three lines long, including the special cases ( of which there are two ) and it hasn't changed in a decade.

It boggles the mind that not only isn't SSIS capable of doing this, but they are not going to fix it for SSIS 2008. Looks like you would need to wait to SSIS 201x for a version that can handle CSV files.

And in NHibernate Land

This is just an update. Fabio has been porting large swathes of Hibernate 3.2, Paul is dealing with the continued port of Hibernate Search while Dario has focused on porting Hibernate Validator. He has also just started to port Hibernate Shards, which is going to make things very interesting indeed.

Me? I am sitting in the corner, admiring the work and occasionally shouting hurray!

Algorithms, joins and performance

I thought about moving from hashtables to Dictionary<T,K>, I got interesting results.

For simple new Dictionary<string,object>(), I expected a significant improvement, but I got this:

image

This is actually much worse than the result of hashtable + ignore case comparison.

When I used that, I got this horrendous result:

image

I tried various other tricks, but none of them change the fact that making 7.5 million calls are going to cost a lot of time. And I want to support more than just 2,500 x 1,500.

I changed the implementation to look like this:

rightRowsByJoinKey = {}
for rightRow in right:
	key = rightRow.CreateKey( rightJoinParams )
	rightRowsByJoinKey[ key ] = [] unless rightRowsByJoinKey[ key ]
	rightRowsByJoinKey[ key ].Add(rightRow)

for leftRow in left:
	key = leftRow.CreateKey( leftJoinParams )
	for matchingRight in rightRowsByJoinKey[ key ] :
		yield MergeRows( leftRow, rightRow )

Now I have N + M, instead on N*M.

From performance perspective, it means that doing nested loop join on 2,500 x 1,500 result in 3.5 millions comparisons, which is quite a bit, even for such a small set of rows. It took over 6 seconds to run on my machine.

A hash join, however,will perform  measly 5,000 operations to do the same amount of work. On my machine, 2,500 x 1,500 completes in 0.2 seconds, most of which are spend it just initialization of the framework.

I try to take that to a spin on with two orders of magnitude more rows, 250,000 x 150,000 has completed in 5.48 seconds. Which is very encouraging.

Hash join is not applicable if you want to join over anything but equality, which is why we need the nested loops join as well.

Performance, Joins and why you should always have a profiler

I did some heavy duty import process yesterday, and we run into severe performance issue with Rhino ETL joins. Five joins with about 250,000 records on the initial left and a few tens of thousands on the rights took about 2 hours to complete.

That was unacceptable, and I decided that I have to fix this issue. I had a fairly good idea about what the issue was. Rhino ETL supports nested loops joins only at the moment, which means that the join is performed as (pseudo code):

for leftRow in left:
	for rightRow in right:
		if MatchJoinCondition(leftRow, rightRow):
			yield MergeRows(leftRow, rightRow)

Obviously the N*M was what causing the problem right? I quickly built a trivial join test, which joined 2,500 rows on the left with 1,500 rows on the right. Trivial stuff, and should result in 1,500 rows returned.

It executed in 6 seconds. That was a shock.

Well 1,500 * 2,500  = 3,750,000, but it should be that bad.

Then I run the code under a profiler, and it completed in 29 seconds, but I also saw this:

image

It is not the nested loop that cost me all this money, it was the hash table lookups!

The most expensive call was GetHashCodeOfString, we have some globalization stuff here, because I told the hashtable to be case insensitive, I tried removing that and run it under the profiler again, now it dropped to 18 seconds, and we had this cost structure to deal with:

image

We still spend almost all of our time just doing hash table lookups, although we dropped by 10 seconds this time.

I don't think that I would have ever consider the cost of simply doing the hashtable lookups as the primary cost of the join operations.

Academia in Real World Development

Frans Bouma has decided to unsubscribe from the alt.net mailing lists. I'll miss him there, I usually disagree with him, but the debates are interesting, educated and fun.

What I want to talk today is about something that he pointed out in the post:

What surprised me to no end was the total lack of any reference/debate about computer science research, papers etc. except perhaps pre/post conditions but only in the form of spec#, not in the form of CS research. Almost all the debates focused on tools and their direct techniques, not the computer science behind them. In general asking 'Why' wasn't answered with: "research has shown that..." but with replies which were pointing at techniques, tools and patterns, not the reasoning behind these tools, techniques and patterns.

I waited to respond to his post until I could formulate a coherent answer, and I think that this quote sums it up pretty well:

“Computer science education cannot make anybody an expert programmer any more than studying brushes and pigment can make somebody an expert painter.”
(Eric Raymond)

The problem is that there is a big gap between the academia and real world development.

Finding a path in a graph? Design a compiler? Analyzing an image? Choosing a search algorithm? Selecting appropriate data structure for a task?

For each of those I would head for the academia, directly. Those are technical issues, and I want the academic proofs and experience there. I want the mathematical foundation for the solution.

Designing a maintainable system? Building a usable framework? Creating the domain model?

For those I am going to not going to go to the academia. I am going to go to the real world practitioner. They guys (and gals) that have been burned in the field and learned from their mistakes.

Building a highly scalable system? Designing for scalability?

For those I am going to head to the papers by Amazon, EBay, etc. The people who are actually dealing with this complexity and can share how things break down at a high enough scale.

You can take a look at Java's API issues if you want to see what happens if you listen too closely to the academia. Hell, just take a look at SQL Server 2005's paging feature, to see just how complex an academic solution can make life.

For most real world situations, I want real world experience, because 90% of software development is not science, it is an art, and I am not interested in discussion about the chemical composition of the pigments when I examine a masterpiece.

Article Review: N Degrees of Separation

Following Frans Bouma's recommendation, I just read N Degrees of Separation: Multi-Dimensional Separation of Concerns.

I came away decidedly unimpressed.

First things first, the article is boring. And SoC is a subject that is near & dear to my heart.

More importantly, I don't like the way they are approaching the solution, even if I agree with their overall premise. The idea that they present is that we should have a way to split functionality not based on a single parameter, but on multiplies of them. To that I agree, most certainly.

Except that the example that they gave was about as real as a three dollar bill. They give an example of an AST and needing to add functionality to it later on. The problem is that this is more or less a solved problem. AST + Visitor == very easily extensibility. I can solve the problem they present by using additive changes only.

They seem to think that major modification to the code is required, although I am not sure why. I'll admit that perhaps it was a poorly chosen example.

What bothers me even more is that their chosen solution is... Partial classes.

Now, I had some small experience in building complex systems, which I need to be able to change at low cost. Here is how I would implement this "N Degrees of Separation":

  • Kernel domain
  • IValidator<TEntity>
  • IPersister<TEntity>
  • IVisitor<TEntity>
  • IUponWhichTheAnglesDance<TPin>

I think you get the idea by now.

Add this to the IoC and you have solved the issue of feature changes. You just define an interface for a specific feature, add a rule to the IoC and you are done.

System.Type.GUID stability

Here is an interesting issue. Can you rely on System.Type.GUID to be stable?

By stable I mean that it will generate the same value for the same type across compilations. Empirical evidence suggest that this is the case, with the following factors determining the Guid of a type:

  • Type name (including the namespace)
  • Assembly name
  • Assembly public key

Reflectoring into the system, it turns out that System.Type.GUID is eventually translated to a call to System.RuntimeType.GetGUID, this is one of the scary InternallCall method that are implemented directly in the runtime itself.

I wonder...

Service locator for optional dependencies

I just found myself doing an optional dependency service location, which got on my nerve the second that I had to deal with it. I added this tiny tidbit to make it easier:

ILogger logger = IoC.TryResolve<ILogger>(new NullLogger());
logger.WarnFormat("Could not find a enities group for entity: {0}",
          typeof(TEntity).FullName);

Creating Partial Domain Models

Here is an interesting problem. I want to package a domain model with specific functionality, but I don't want to constrain the model users of this functionality will use.

For example, let us take security systems and the User entity. I have yet to see any two projects that had identical users entities.

Now, the concept of a user in a security system is pretty important, I would say. But no important enough that I would want to force my idea about what a user looks like on an innocent domain model.

Problem, right?

Now really. We can use the usual OO abstractions to handle this. Check this domain model:

image

The only requirement that this security system has is to have a User entity that implements the IUser interface. The only requirement that this has is to expose a single property, SecurityInfo, which describe the user. Notice the SecurityIdPropertyName there, it is there to allow us to do queries based on the security id without forcing a structure on the user entity.

How do it actually work, however, is more interesting.

At initialization time, we perform Mapping Rewrite and tell NHibernate that the concrete implementation of IUser in all its associations is mapped to the User entity from our project.

This means that we get to have the cake (separate domain models) and eat it (rich domain models without constraints).

Active Record: Mapping Rewriting

One of the really nice things about using the NHibernate / Active Record stack is that there are so many extensions points.

One of the more fun things is the ability to do runtime modifications of the mapping. Here is a simple example:

ActiveRecordStarter.ModelsCreated+=delegate(ActiveRecordModelCollection models, IConfigurationSource source)
{
    foreach (ActiveRecordModel model in models)
    {
        if(model.Type == typeof(User))
        {
            model.ActiveRecordAtt.Table = "MyUsers";
        }
    }
};

Here we are re-writing the table that the User entity will go to. We can get much more complex, since we have all the knowledge of the structure of the code and can apply things like naming convention, validation, etc. It is also a fast way to translate between database/conventions.

NHibernate has a similar concept, INamingStrategy, which can be used to some of the same purposes, but I like this approach better, since it gives me more semantic information.

Count the interfaces...

Without running this code, what does this print?

public interface IStartable : IDisposable
{
   void Start();
}

public class Pipeline : IStartable
{ 
  // ...
}

Console.WriteLine(typeof(Pipeline).GetInterfaces().Length);

Code that read and modify code, code that read codes and output XML

If you can follow that sentence, congratulations.

I spend today dealing with an interesting problem. We have a set of commands that operates in response to a message from another system. This system is configured by an XML file. One of the parameters for configuration is which fields in the message should be sent.

I consider this an abomination unto Nuggan, but this is a key performance optimization technique. Otherwise the external system may spend as much as ten seconds just gathering the data for this message.

We started out using the simplest approach of just give us all the fields back. It doesn't work, the performance is just awful with this approach. But the other approach means that updating a command to use a new field (or stop using a field) is a process that can take up to 5 minutes.

This is insane. And all of that because it can't really handle a message with all the fields in a reasonable time frame. I am being overgenerous here, actually. Let me put it more clearly, the "message" is a row. The fields are the columns in the row. I have no idea what it is doing there, because a SELECT * FROM MyTable WHERE id = '[id]' returns instantly.

Nevertheless, we had a problem. We needed to go over ~127 existing commands and modify the configuration to make sure that only the fields that we are using will be included. To say that this is error prone and annoying isn't the least of it.

I decided that I would take care of this task. I know how other people would handle this. I am not subjecting them to several days of tedious code reading & XML editing.

The end result was a piece of code that used NRefactory to read the parse the code, looked for field access in the class, and then generated an attribute that marked the required fields.

[Required(Constants.customer.name, Constants.customer.email)]

Two things that were complex there were handling inheritance, we have some common functionality that we abstracted to a set of base classes, that was interesting to deal with. The second matter was making sure that this code would not just generate the attribute correctly, but would actually be able to edit the source code files with the information as well as be able to do it more than once.

This allows us to just put this in as a pre build step, and we are done. Accessing a field will cause it to be automatically registered as a required field.

The second part was another piece of code, this time it took the compiled assembly and used reflection to generate the required XML config.

We are back to one truth, maintainability cost is very low, and the only problem is that replacing the XML config takes about 5 minutes, so it is probably something that we will do rarely, and just use the "return all" during development.

One Source of Truth, remember.

Task Scheduling Improvements

I took some time to see how I can improve the sample implementation of the task scheduling that I posted yesterday.

You can checkout the code here. What I wanted to check is the ability to scale the solution to many tasks and to IO bound tasks.

In a stress test that I made, the library held its own while scheduling 9,864,100 tasks, at one point we had ~3,500,000 concurrently queued tasks. Memory usage hovered at the ~500Mb range.

The next stage was to see what happens when we have tasks that takes a long amount of time to execute, depends on IO, etc.

I wrote this piece of code:

public class GatherAllFiles : AbstractTask
{
    private readonly string root;

    public GatherAllFiles(string root)
    {
        this.root = root;
    }

    protected override IEnumerable<Condition> Execute()
    {
        List<string> results = new List<string>();
        results.Add(root);
        
        List<Future> futures = new List<Future>();
        foreach (string directory in Directory.GetDirectories(root))
        {
            Future spawn = Spawn(new GatherAllFiles(directory));
            futures.Add(spawn);
        }
        string[] files = Directory.GetFiles(root);

        yield return Done(futures);

        foreach (Future future in futures)
        {
            results.AddRange(future.GetValue<IEnumerable<string>>());
        }
        results.AddRange(files);
        SetResult(results);
    }
}

I then run it on my OSS directory. This directory contains 133,108 directories and 206,298 files (ask me how I know... ).

The library just ate it up without even noticing. Very nice, even if I say so myself :-)

Concurrency Solutions

I am not sure why, but recently I have became interested in concurrency again. I blogged about both the pipeline and the tasks scheduling approaches recently, and I was repeatedly pointed to Retlang, Parallel FX and the Concurrency and Coordination Runtime.

I thought that it would be interesting to compare those approaches.

The pipeline

The pipeline can be easily parallelized, because there is an explicit handoff of objects as they pass from one stage of the pipeline to another. The only synchronization needed for this would be in terms of passing the objects between the stages.

Task Scheduling

Task scheduling is interesting, mainly because the way it operates is fits the concurrent model so nicely. You execute something, you wait for it, you don't take any resources while you do this. To me, it looks like quite a natural way of programming, especially with the ability to wait on other tasks.

Retlang

Retlang's concurrency model is based on message passing. As such, it is ideally suited to supporting cross thread communication, event driven systems, pipeline processing, etc. Bind Retlang with the Task Scheduling solution, and you are nearly at Erlang's capabilities.

Parallel FX

I get to run things in parallel, that is about it. The ability to scale a for loop to the number of processors on the machine is nice, but I am seeing it as a low level utility more than anything else. PLinq is much more interesting.

Then again, I am currently in search for higher level concurrency solutions than what Parallel FX supply.

Concurrency and Coordination Runtime

This looks a lot more like what I had in mind. The concurrency primitive that we have here are high level enough to be really useful, and it looks like it has implemented the same ideas that I had, both message passing and task based. I haven't played with it, just read the docs, so I am not sure how useful this really is.

State

The real problem with concurrency is state. Erlang gets away with dealing with it because it has no mutable state. In most imperative languages, we have to deal with this somehow.

The usual answer for that is don't share state. The pipeline example is a good one, you don't get two processes dealing with the same object at the same time.

In my task scheduling code, I am ensuring that a task can run on a single thread at a time (but not that it run on the same thread all the time. This should take care of most state issues, since as long as you keep to your own instance, you are safe. If you start passing around mutable variables, then you need to thread more carefully.

Ordering

Another issue that you have with concurrency is that you usually have some requirements about in what order you want to deal with operations. You can't really approve an order before it was created, now can you?

Different approaches for concurrency has different guarantees in regards to ordering. Retlang will always preserve ordering, which make some things extremely simple. My Task Scheduling library will not preserve ordering (it will schedule the first task that is ready to go), and Parallel FX doesn't preserve ordering unless you explicitly request it (couldn't find out about the CCR).

An interesting observation about the task scheduling library, it doesn't really need ordering, because of the way we yield in the method, we can get away with just waiting for the values to be completed.

I don't think that there is a right approach, but I strongly lean toward tasks and message passing as a default concurrency approach.

Erlang processes on the CLR

I mentioned that getting to Erlang like processes is something that I really want to get to. I started thinking about how I can make it work with the IEnumerable<T> implementation.

As it turned out, that was fairly simple, once I got the idea. Take a look at this class diagram:

image

We have tasks, and we register them in the scheduler. We can also wait for them without blocking the thread. Check out this trivial example. We have the following long running task:

public class MultiplicationTask : AbstractTask
{
    private readonly int value;

    public MultiplicationTask(int value)
    {
        this.value = value;
    }

    protected override IEnumerable<Condition> Execute()
    {
        Thread.Sleep(1000);
        SetResult(value * 2);
        yield break;
    }
}

And the code that want to call it:

public class UseMultiplicationTask : AbstractTask
{
    protected override IEnumerable<Condition> Execute()
    {
        Future future1 = Spawn(new MultiplicationTask(15));
        Future future2 = Spawn(new MultiplicationTask(23));

        yield return delegate { return future1.HasValue && future2.HasValue; };

        System.Console.WriteLine(future1.GetValue<int>() + future2.GetValue<int>());
    }
}

We yield a condition that tell us when we can be executed again. From then, it is just a matter of properly scheduling the different tasks.

We can probably pretty it up a bit with lambdas, but this works.

No, instead of listing all the code here, you can browse it here:

https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/SampleApplications/BlogCode/BlogCode/ErlangTasks

I really like this approach

I Yield, I Yield

I am going over the ParallelFX code at the moment, and I run into this interesting method.

Platform.Yield(), which turn out to be a call to SwitchToThread(), which is a method I really should have known about. Turn out that this is a way to explicitly give up the current time-slice that the thread is executing (for the current CPU only).

This lead me to do some more discovery, and I found this, managed fibers implementations (I can't find the source for this, however, and I have the SDK). This is all part of my quest to get Erlang-like support for concurrency. By that I mean that I can do things like:

public void Process()
{
   var dbFuture = Spawn( AquireDataFromDB );
   var wsFuture = Spawn( AquureDataFromWS );

   YieldUntil(dbFuture, wsFuture);

   // do real processing here.
}

The key point is that the call to YieldUntil does not block the current thread, instead, it frees it to do start executing additional processing.

We can keep abusing the yield return approach, but that is starting to get old.

Tags:

Published at

Fluent Pipelines

I am having a discussion with Jon Skeet about the merits of using Linq for pipelines and delegates/lambda instead of classes.

I kept saying that I don't really see the point, so I went ahead and implemented this:

GenerateData(10000)//enumerator from 0 .. 10000
	.Where(i => i % 3 == 0)
	.Transform(i => (i * 2).ToString() )
	.Act(i => Console.WriteLine(i))
	.Execute();

This uses the same approach as my previous pipeline, but it does it in C# 3.0, so it can use things like extension methods, which make this nicer. The same in C# 2.0 is possible, but take some ridiculous amount of code to do.

This code is much simpler than the code I have shown here, no?

Why do I use the first approach then?

Scalability.

What we are seeing here is about as trivial as it can get. What happens when we have more complex semantics?

Let us take writing to the database as an example. We need to do quite a bit there, more than we would put in the lambda expression, certainly. We can extract to a method, but then we run into another problem, we can't do method inheritance. This means that I have no easy way of abstracting the common stuff outside. Well, I can use template method, but that works if and only if I have a single place I want to change behavior.

As an example of scaling, let us take this piece of code:

public class FibonacciBulkInsert : SqlBulkInsertOperation
{
	public FibonacciBulkInsert() : base("test", "Fibonacci")
	{
	}

	protected override void PrepareSchema()
	{
		Schema["id"] = typeof (int);
	}
}

Which uses this base class to handle the bulk of the work.

One thing that Jon mentioned that was interesting was the ability to take advantage of Linq specific improvements, such as PLinq. This is indeed a consideration, but upon some reflection on it, I realized that the two are not mutually exclusive. If I want to take advantage of any of that, all I need is to modify the pipeline to iterate using PLinq rather than foreach.

ASP.Net Ajax, Error Handling and WTF


I am facing some really unpleasant choices at the moment. And I thought it would be so easy.
I wanted to add global error handling to all the web services that we expose as [ScriptService].
For some reason, they didn't show up in the Application_Error event, and that caused the exception details to be sent to the client.
It looks like for some strange reasons, web services exceptions don't go through the Application_Error, so I implemented a SoapExtension to handle that, but ScriptService doesn't go through the Soap layers, so it obviously doesn't work.
Then I went and looked at the code.
The whole thing is _hard coded_ inside, and there is no way in.
No way to globally capture exception being raised from script services.
No way in to add this ability.
Urgh! Argh!

I mean, it is not like I want something very special, it sounds to me like a reasonable request.
Yes, I know you can turn off the error in the config, but I would like, you know, to log this.

Of course, everything is internal in there, so I can't just go and override the RestHandler. ExecuteWebServiceCall(), I would have to have a private build of this stuff.
Just to be able to log exceptions.

Pipes and filters: The multi threaded version

Another advantage of using the pipes and filters approach is that it is naturally thread safe. Only a single filter is handling an item at a given time, even if the pipes are all running on multiply threads.

This means that we have an very simple way to introduce threading into the system, without any hassle whatsoever. Let us take a look at the single threaded pipeline:

public class SingleThreadedPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public SingleThreadedPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

Very simple, except the last line, which is what push the entire pipeline along. Now, what do we need in order to make this multi threaded?

Well, what do I mean when we talk about multi threaded? I mean that we will execute all the operations concurrently, so they can process different parts of the pipeline at the same time. This allows us to make better use of our computing resources, etc.

Here is the code:

public class ThreadPoolPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public ThreadPoolPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            IEnumerable<T> execute = operation.Execute(current);
            current = StartConsuming(execute);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
    private ThreadSafeEnumerator<T> StartConsuming(IEnumerable<T> enumerable)
    {
        ThreadSafeEnumerator<T> threadSafeEnumerator = new ThreadSafeEnumerator<T>();
        ThreadPool.QueueUserWorkItem(delegate
        {
            try
            {
                foreach (T t in enumerable)
                {
                    threadSafeEnumerator.AddItem(t);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                threadSafeEnumerator.MarkAsFinished();
            }
        });
        return threadSafeEnumerator;
    }
}

We are using ThreadSafeEnumerate here, and pass a callback to the thread pool which will execute the pervious part of the pipeline and push them into the current pipeline.

This is just an advance version of decorators.

The implementation of ThreadSafeEnumerator is about as simple as multi threaded code can be:

public class ThreadSafeEnumerator<T> : IEnumerable<T>, IEnumerator<T>
{
    private bool active = true;
    private readonly Queue<T> cached = new Queue<T>();
    private T current;

    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }

    public T Current
    {
        get { return current; }
    }

    public void Dispose()
    {
        cached.Clear();
    }

    public bool MoveNext()
    {
        lock (cached)
        {
            while (cached.Count == 0 && active)
                Monitor.Wait(cached);

            if (active == false && cached.Count == 0)
                return false;

            current = cached.Dequeue();

            return true;
        }
    }

    public void Reset()
    {
        throw new NotSupportedException();
    }

    object IEnumerator.Current
    {
        get { return Current; }
    }

    public void AddItem(T item)
    {
        lock (cached)
        {
            cached.Enqueue(item);
            Monitor.Pulse(cached);
        }
    }

    public void MarkAsFinished()
    {
        lock(cached)
        {
            active = false;
            Monitor.Pulse(cached);
        }
        
    }
}

The real magic happens in MoveNext(), with support from AddItem() and MarkAsFinished().

This is it, these two classes are all we need to make everything else multi threaded.

Note that this version assumes that you can execute all the operations concurrently, which may not be the can if you have a lot of them (over 25/CPU by default). At that point, we would need to implement coroutines for the ThreadSafeEnumearator, instead of just blocking the thread.

Pipes and filters: The IEnumerable appraoch

Pipes are very common in computing. It is a very good way to turn a complex problem to a set of small problems. You are probably familiar with the pattern, even if not explicitly.

  • The ASP.Net Http Pipeline (Begin_Request, Authorize_Request, etc
  • Compiler Pipelines (Parse, ProcessTypes, SaveAssembly, etc)
  • Command Line piping (ps -ax | grep Finder)

What I wanted to talk about today was how to implement this in code. I did several implementation of pipes and filters in the past, and they all were overly complex. I took this weekend to look at the problem again, and I came up with a ridiculously simple solution.

In a nutshell, here it is:

image

We have a pipeline, that is composed of operations. Each operation accepts an input and return an output. The use of IEnumerable<T> means that we can streamline the entire process without any effort whatsoever.

Most problems that calls for the pipeline approach are fairly complex, so picking a simple example means that it is trivial to implement it otherwise. Let us go to the really trivial sample of printing all the processes whose working set is greater than 50 MB.

We have three stages in the pipeline, the first, get processes:

public class GetAllProcesses : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        return Process.GetProcesses();
    }
}

The second, limit by working set size:

public class LimitByWorkingSetSize : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        int maxSizeBytes = 50 * 1024 * 1024;
        foreach (Process process in input)
        {
            if (process.WorkingSet64 > maxSizeBytes)
                yield return process;
        }
    }
}

The third, print process name:

public class PrintProcessName : IOperation<Process>
{
    public IEnumerable<Process> Execute(IEnumerable<Process> input)
    {
        foreach (Process process in input)
        {
            System.Console.WriteLine(process.ProcessName);
        }
        yield break;
    }
}

All of those are very trivial implementation. You can see that the GetAllProcesses class doesn't care about its input, it is the source. The LimitByWorkingSetSize iterate over the input and use the "yield return" keywords to stream the results to the next step, PrintProcessesName. Since this step is the final one, we use the "yield break" keywords to make it compile without returning anything. (We could return null, but that would be rude).

It is important to note that the second stage uses the if to control what get pass downstream.

Now we only have to bring them together, right?

public class TrivialProcessesPipeline : Pipeline<Process>
{
    public TrivialProcessesPipeline()
    {
        Register(new GetAllProcesses());
        Register(new LimitByWorkingSetSize());
        Register(new PrintProcessName());
    }
}

Now, executing this pipeline will execute all three steps, in a streaming fashion.

Okay, this is a lot of code that we can replace with the following snippet:

int maxSizeBytes = 50 * 1024 * 1024;
foreach (Process process in Process.GetProcesses())
{
     if (process.WorkingSet64 > maxSizeBytes)
         System.Console.WriteLine(process.ProcessName);
}

What are we getting from this?

Composability and streaming. When we execute the pipeline, we are not executing each step in turn, we are executing them all in parallel. (Well, not in parallel, but together.)

Hey, I didn't show you how the Pipeline<T> was implemented, right?

public class Pipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public Pipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

I'll leave you to ponder that.

Mutli Mocks and verifying behavior

I have the following piece of code:

protected override void DoClose()
{
    IDisposable dispoable = enumerator as IDisposable;
    if (dispoable != null)
        dispoable.Dispose();

    dispoable = enumerable as IDisposable;
    if(dispoable != null)
        dispoable.Dispose();
}

How do I verify that it correctly disposes both enumerator and enumerable when the method is called?

Here is the test that I wrote, it is using Rhino Mocks' Multi Mocks feature to generate a proxy that has more than a single implementation.

[Test]
public void WillDisposeInternalEnumeratorAndEnumerableWhenDisposed()
{
    MockRepository mocks = new MockRepository();
    IEnumerable<Row> enumerable = mocks.DynamicMultiMock<IEnumerable<Row>>(typeof(IDisposable));
    IEnumerator<Row> enumerator = mocks.DynamicMock<IEnumerator<Row>>();
    using(mocks.Record())
    {
        SetupResult.For(enumerable.GetEnumerator()).Return(enumerator);
        enumerator.Dispose();
        ((IDisposable)enumerable).Dispose();
    }
    using (mocks.Playback())
    {
        DictionaryEnumeratorDataReader reader =
            new DictionaryEnumeratorDataReader(new Dictionary<string, Type>(), enumerable);
        reader.Dispose();
    }
}

Simple, and to the point.

My Code Sucks

There is a point where a project goes beyond the pale, where the complexity goes so far out of line that it is simply ludicrous.

I had such a point today. I had enough with SSIS and decided that I want to replace it with something better. I wrote an ETL tool to handle that in a few hours.

Why is this relevant? Because I have already build an ETL tool. Rhino ETL.

It is quite telling when the author of a tool decide that he doesn't want to use it.

I was decidedly proud of Rhino ETL for a while, then the problems started to creep in. The problems were not in the code per se, the entire architecture of the code was overly complex. In order to handle this complexity, I had resorted to throwing code at the problem, and then more code, and more code yet again.

At the moment, the current code base has two "minor" problems, exception handling and threading. The bigger problem is that I don't want to have to wade into this stinking pile and try to figure out what is going on there. I tried to be clever, and it is clever, in a horrible sort of way.

I don't have the time or patience to decipher code at the best of time, and at this point, it has gotten simply too complex. The project right now is at ~9,000 lines of code, so it is not that it is big, it is simply complex.

From the architecture perspective, I have made one huge mistake, I exposed the threading model to the application code. You can say that this stand in the root of my problems. I actually re-wrote this once already, moving from a home grown threading solution to using Retlang for threading. I made the same mistake and exposed the threading model to the application itself. Can you say: Big mistake!

From the point of view of the project itself, I started by defining the DSL syntax, and then built the project around that. It turns out that this has the usual "let us build the whole layer at a time". It also meant that a lot of the code had deep assumptions about the way it is called, making it unusable for using in other ways. This is excusable if we are talking about the DSL mapping layer, but not for the core code base itself.

Anyway, I am ranting and I should stop.

I spend six to eight hours today rewriting it from scratch.  It doesn't do threading, and it doesn't have a DSL interface yet, but it does pretty much everything that the old project did, in a quarter of the lines of code, and in a way that is much safer and easier to handle than what we are using currently.

It's the future now

For various reasons, I had to implement the Future pattern in two completely different settings in three different ways today.

A future is a place-holder for the undetermined result of a (concurrent) computation. Once the computation delivers a result, the associated future is eliminated by globally replacing it with the result value. That value may be a future on its own.

The last implementation was the explicit one, it was simply:

public class Future<T>
{
	private T value;
	private bool hasValue;
	private string error;

	public void SetValue(T value)
	{
		this.value = value;
		hasValue = true;
	}

	public void SetError(string error)
	{
		this.error = error;
	}

	public bool HasError
	{
		get { return error != null; }
	}

	public T Value
	{
		get { return value; }
	}

	public bool HasValue
	{
		get { return hasValue; }
	}

	public string Error
	{
		get { return error; }
	}
}

This is the simplest approach, primitive, you would say. It requires active collaboration from the code that uses it, and it requires that the method would return a future value. It is a good approach for a very small API, but this is often something that you want to handle in a cross cutting fashion. First, we need a more robust future implementation. (This is wasteful in that it doesn't lazily allocate the reset event).

public class Future<T>
{
	private ManualResetEvent resetEvent = new ManualResetEvent(false);
	private T value;
	private bool hasValue;
	private Exception error;

	public void SetValue(T value)
	{
		this.value = value;
		hasValue = true;
		resetEvent.Set();
	}

	public void SetError(Exception error)
	{
		this.error = error;
		resetEvent.Set();
	}

	public bool HasError
	{
		get { return error != null; }
	}

	public T Value
	{
		get
		{
			resetEvent.WaitOne();
			resetEvent.Close();
			if (error != null)
				throw new FutureInvocationException(error);
			return value;
		}
	}

	public bool HasValue
	{
		get { return hasValue; }
	}

	public Exception Error
	{
		get
		{
			resetEvent.WaitOne();
			return error;
		}
	}
}

Now that we have that, we still have to deal with changing the interfaces and an explicitly concurrent programming model. There is a better way, I got the idea from watching a future / active objects implementation in Python, from Ronnie Maor. The ability to subvert the return value there allows for really nice things.

We will start with the easiest version to grasp, we will define the code that we want to work with:

public class WorkFlowController
{
    public virtual int LongRunningTask()
    {
        Console.WriteLine("Starting long running");
        Thread.Sleep(5000);
        Console.WriteLine("Completed long running");
        return 42;
    }
}

Now, we want to make this into something that will return a future, and we want to do it in a way that wouldn't be too awkward. Here is a simple usage:

WorkFlowController controller = new WorkFlowController();
Future<int> future = InThe.Future<int>(delegate { return controller.LongRunningTask(); });
Console.WriteLine("After calling long running");
Console.WriteLine(future.Value);

Executing this code produces:

After calling long runnning
Starting long running
Completed long running
42

The InThe.Future part is simply:

public class InThe
{
    public delegate T Proc<T>();
    public static Future<T> Future<T>(Proc<T> futureAction)
    {
        Future<T> future = new Future<T>();
        ThreadPool.QueueUserWorkItem(delegate
        {
            try
            {
                future.SetValue(futureAction());
            }
            catch (Exception e)
            {
                future.SetError(e);
            }
        });
        return future;
    }
}

This is pretty straightforward, but I don't like the syntax that we have here (even with C# 3.0, it is still ugly). What I would like to get is this:

WorkFlowController controller = With.Future<WorkFlowController>();
Future<int> future = InThe.Future(controller.LongRunningTask());
Console.WriteLine("After calling long runnnig");
Console.WriteLine(future.Value);

Suddenly I have a saner model, and the model to support futures is explicit. The implementation is fairly trivial as well:

public class FutureInterceptor : IInterceptor
{
    public void Intercept(IInvocation invocation)
    {
        InThe.lastInvocation = invocation;
        if (invocation.Method.ReturnType.IsValueType)
            invocation.ReturnValue = Activator.CreateInstance(invocation.Method.ReturnType);
    }
}

public static class With
{
    private readonly static ProxyGenerator generator = new ProxyGenerator();

    public static T Future<T>(params object[] args)
    {
        return (T)generator.CreateClassProxy(
            typeof (T), 
            new IInterceptor[] {new FutureInterceptor()}, 
            args);
    }
}

public class InThe
{
    public static IInvocation lastInvocation;

    public static Future<T> Future<T>(T ignored)
    {
        Future<T> future = new Future<T>();
        IInvocation invocation = lastInvocation;
        lastInvocation = null;
        ThreadPool.QueueUserWorkItem(delegate
        {
            try
            {
                invocation.Proceed();
                future.SetValue((T)invocation.ReturnValue);
            }
            catch (Exception e)
            {
                future.SetError(e);
            }
        });
        return future;
    }
}

I like this best, and it is very easy to implement and work with. Return value woes has been overcome, horray!