Ayende @ Rahien

Refunds available at head office

Rhino ETL 2.0

Rhino ETL was born out of a need. I need to do a lot of ETL type operations. Those include anything from moving data from legacy databases to my database, importing files, importing data over web services, etc. For a while, I have used SSIS for those needs. It has proven... inadequate. Mostly in terms of ease of development, deployment, error handling, etc.

This is my third attempt at building an ETL tool. The third time is much shorter and clearer than both previous attempts.

The goals for the project were:

  • Developer friendly:
    • Errors
    • Development
    • Deployment
  • Performant - I need to move large amounts of data around, and I need to do it fast.
  • Easy - The hard part should be handling the required transforms, dealing with the logic for in/out, etc.
  • Unlimited - You should not be limited to what the tool gives you, and you should be able to integrate easily with other tools
  • Language agnostic - You should be able to develop solutions for this using C#/VB/Boo
  • DSL - Provide a DSL to make it even easier to work with

The overall concept is based around these two classes, and the idea of a pipeline:

image image

Here is a simple operation, which just generate all the even numbers to a million:

public class EvenNumberToMillion : AbstractOperation
{
	public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
	{
		for(int i = 2; i< 1000000; i += 2)
		{
			Row row = new Row();
			row["number"] = i;
			yield return row;
		}
	}
}

This is an input operation, it ignores its rows parameter, and yields rows generated by some other way. As you can see, we use the yield rows for each iteration.

We combine operations into a pipeline using the process. If we wanted to print the numbers, we would have build the following pipeline process:

public class PrintNumbersProcess : EtlProcess
{
	public override void Initialize()
	{
		Register(new EvenNumberToMillion());
		Register(new PrintNumbers());
	}
}

All the output of the first operation goes into the second operation, and so on and so forth. Using the DSL it will look like:

operation EvenNumbersToMillion:
	for i in range(1000000,2):
		yield Row(Number: i)

operation PrintNumbers:
	for row in rows:
		print row.Number

process PrintNumbersProcess:
	EvenNumbersToMillion()
	PrintNumbers()

This is just to demonstrate the concept of the pipeline. Now we can get into the interesting operations. As you already surmised, AbstractOperation is the common base class, and you can inherit it to produce rows from any source.

Inputs

Rhino ETL offers special support for getting data from a database. It means that you can define it simple as:

public class ReadUsers : ConventionInputCommandOperation
{
    public ReadUsers() : base("test")
    {
        Command = "SELECT Id, Name,Email FROM Users";
    }
}

Or, using DSL:

input "test", Command = "SELECT id, name, email  FROM Users"

Note the "test" here, it is the name of the connection string in the app.config.

Outputs

On the output side, we have more interesting options. We can use any custom option that we want, of course, but for working with databases, we have the following options:

Standard DB commands:

public class FibonacciOutput : ConventionOutputCommandOperation
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

You'll note that I am not specifying the parameters, those are taken implicitly from the current row.

Using DSL:

output "test", Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)"

SQL Batch operations:

public class FibonacciOutput : ConventionSqlBatchOpeartion
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

Using DSL: Haven't written that yet :-(

Sql Bulk Insert:

 public class FibonacciBulkInsert : SqlBulkInsertOperation
    {
        public FibonacciBulkInsert() : base("test", "Fibonacci")
        {
        }

        protected override void PrepareSchema()
        {
            Schema["id"] = typeof (int);
        }
    }

Using DSL: Haven't written that yet :-(

Files

For working with files, we have the support of the excellent FileHelpers library, which makes working with files really easy.

Reading from a file is simply:

public class UserRecord
{
    public string email;
    public int id;
    public string name;
}

public class ReadUsersFromFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using(FileEngine file = FluentFile.For<UserRecord>().From("users.txt"))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

There is 1:1 translation to Boo here, so I'll spare you that.

Writing is very similar:

public class WriteUsersToFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        FluentFile engine = FluentFile.For<UserRecord>();
        engine.HeaderText = "Id\tName\tEmail";
        using(FileEngine file = engine.To("users.txt"))
        {
            foreach (Row row in rows)
            {
                UserRecord record = new UserRecord();
                
                record.Id = (int)row["id"];
                record.Name = (string)row["name"];
                record.Email = (string)row["email"];

                file.Write(record);
            }
        }
        yield break;
    }
}

Joins

Joins are an interesting concept, and I play with them quite extensively recently. Joins in Rhino ETL are implemented as sub processes. Hash joins are very simple:

public class JoinUsersAndIds : JoinOperation
{
	protected override Row MergeRows(Row leftRow, Row rightRow)
	{
		Row row = leftRow.Clone();
		row["user_id"] = rightRow["new_id"];
		return row;
	}

	protected override void SetupJoinConditions()
	{
		InnerJoin
			.Left("id")
			.Right("old_id");
	}
}

This is just the operation, you hook it up in the process using:

Register(new JoinUsersAndIds()
         	.Left(new GenerateUsers(25000))
         	.Right(new GenerateRandomIds(15000)));

Each side is capable of accepting a full blown sub process on its own.

Nested loops joins are appropriate for the more complex cases:

public class FullJoinUsersToPeopleByEmail : NestedLoopsJoinOperation
{
    protected override bool MatchJoinCondition(Row leftRow, Row rightRow)
    {
        return LeftJoin(leftRow["email"], rightRow["email"]);
    }

	protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = new Row();
        row.Copy(leftRow);
        row["person_id"] = rightRow["id"];
        return row;
    }
}

Using DSL it looks like this:

join get_user_roles:
	left:
		input "test", Command = "SELECT id, name, email  FROM Users"
		
	right:
		nput "test", Command = "SELECT userid, roleid FROM UsersToRoles"
			
	on left.id ==  right.userid:
		row.Name = left.Name
		row.Role = right.RoleId

(A note about this syntax, this currently generate a nested loop join, I intend to make it generate an optimize version soon).

Branching

That was quite a challenge to implement, I kept missing a key point, and that tripped me for a while. Here is how send a row to several sources:

BranchingOperation split = new BranchingOperation()
	.Add(Partial
		.Register(new MultiplyByThreeOperation())
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()))
	.Add(Partial
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()));
Register(split);

(Note, the implementation is not as optimized as it could be.)

Aggregates

Well, you got to have those, don't you? Here is a simple Row Count aggregate:

public class RowCount : AbstractAggregationOperation
{
    	protected override void Accumulate(Row row, Row aggregate)
      {
            if (aggregate["count"] == null)
                aggregate["count"] = 0;

            int count = (int)aggregate["count"];
            aggregate["count"] = count + 1;
      }
}

We are called once per row, and can accumulate all the values that we want. We can use grouping to create more interesting results:

public class CostPerProductAggregation : AbstractAggregationOperation
{
    protected override void Accumulate(Row row, Row aggregate)
    {
        aggregate["name"] = row["name"];
        if(aggregate["cost"]==null)
            aggregate["cost"] = 0;
        aggregate["cost"] = ((int) aggregate["cost"]) + ((int) row["price"]);
    }

    protected override string[] GetColumnsToGroupBy()
    {
        return new string[] {"name"};
    }
}

We can also override the FinishAggregation(Row) method to complete any calculations when all the rows are completed. Rhino ETL guarantees that we will get the same aggregate row for all the rows that match the same columns, so that is taken care of.

Using DSL for that is simply:

aggregate join_product_names:
	accumulate:
		aggregate.names = [] if aggregate.names is null
		aggregate.names.Add(row.name)
	
	terminate:
		aggregate.result = string.Join(", ", aggregate.names.ToArray(string))

That is about it for now, I think. You can get the source for Rhino ETL here:

https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/rhino-etl

I plan to have a binary release one I am confident that all the notes in this post are no longer relevant.

Comments

Liang
01/16/2008 04:03 AM by
Liang

You are the MAN! That is only thing I can say :=)

KuRcZaK
01/16/2008 08:29 AM by
KuRcZaK

ETL stands for Extract Transform and Load so in general it is that sort of software that enables you to move, consolidate, and do many other sophisticated operations on any type of data.

BTW: Oren maybe it is a good time to start having a small dictionary on your web site. Mostly with the abbreviations you are using in your posts. OK - it's not that hard to find them on wiki, google etc. but one has to spend some time to do it and what's more dangerous some abbreviations stand for more than one term and this makes such (google,wiki) approach misunderstanding prone :)

flipdoubt
01/16/2008 07:14 PM by
flipdoubt

I have clients with very stringent, yet technology agnostic backup requirements that keep getting me pushed in the direction of SSIS, but the clients' rarely have the on-site technical moxy to maintain such a system. If I look into Rhino ETL to extract slices of data for backup (the clients don't want to bother with backing it all up), does ETL have features to let me turn off auto-incrementing identities when restoring data from the file based backup?

Thanks! I'm looking forward to using this tool.

Ayende Rahien
01/16/2008 07:16 PM by
Ayende Rahien

You can execute commands normally, so it would probably be something like (this is not the real command, I can't recall that off the top of my head):

Initialze()

{

// register pipeline

ExecuteNonQuery("MyDb", "dbcc turnoffidentity for blah");

}

PostProcess()

{

ExecuteNonQuery("MyDb", "dbcc turnonidentity for blah");

}

Anders
01/18/2008 07:35 PM by
Anders

Interesting indeed. Now I might be stupid but when I try to build the solution the file Rhino.Etl.Core\Properties\AssemblyInfo.cs is missing...

So is it incomplete or am I just not doing it right?

Anders
01/18/2008 07:41 PM by
Anders

Yes, that was me being stupid :-)

Ayende Rahien
01/18/2008 08:38 PM by
Ayende Rahien

You need to run the command line build first

Comments have been closed on this topic.