Rhino ETL 2.0

time to read 11 min | 2021 words

Rhino ETL was born out of a need. I need to do a lot of ETL type operations. Those include anything from moving data from legacy databases to my database, importing files, importing data over web services, etc. For a while, I have used SSIS for those needs. It has proven... inadequate. Mostly in terms of ease of development, deployment, error handling, etc.

This is my third attempt at building an ETL tool. The third time is much shorter and clearer than both previous attempts.

The goals for the project were:

  • Developer friendly:
    • Errors
    • Development
    • Deployment
  • Performant - I need to move large amounts of data around, and I need to do it fast.
  • Easy - The hard part should be handling the required transforms, dealing with the logic for in/out, etc.
  • Unlimited - You should not be limited to what the tool gives you, and you should be able to integrate easily with other tools
  • Language agnostic - You should be able to develop solutions for this using C#/VB/Boo
  • DSL - Provide a DSL to make it even easier to work with

The overall concept is based around these two classes, and the idea of a pipeline:

image image

Here is a simple operation, which just generate all the even numbers to a million:

public class EvenNumberToMillion : AbstractOperation
{
	public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
	{
		for(int i = 2; i< 1000000; i += 2)
		{
			Row row = new Row();
			row["number"] = i;
			yield return row;
		}
	}
}

This is an input operation, it ignores its rows parameter, and yields rows generated by some other way. As you can see, we use the yield rows for each iteration.

We combine operations into a pipeline using the process. If we wanted to print the numbers, we would have build the following pipeline process:

public class PrintNumbersProcess : EtlProcess
{
	public override void Initialize()
	{
		Register(new EvenNumberToMillion());
		Register(new PrintNumbers());
	}
}

All the output of the first operation goes into the second operation, and so on and so forth. Using the DSL it will look like:

operation EvenNumbersToMillion:
	for i in range(1000000,2):
		yield Row(Number: i)

operation PrintNumbers:
	for row in rows:
		print row.Number

process PrintNumbersProcess:
	EvenNumbersToMillion()
	PrintNumbers()

This is just to demonstrate the concept of the pipeline. Now we can get into the interesting operations. As you already surmised, AbstractOperation is the common base class, and you can inherit it to produce rows from any source.

Inputs

Rhino ETL offers special support for getting data from a database. It means that you can define it simple as:

public class ReadUsers : ConventionInputCommandOperation
{
    public ReadUsers() : base("test")
    {
        Command = "SELECT Id, Name,Email FROM Users";
    }
}

Or, using DSL:

input "test", Command = "SELECT id, name, email  FROM Users"

Note the "test" here, it is the name of the connection string in the app.config.

Outputs

On the output side, we have more interesting options. We can use any custom option that we want, of course, but for working with databases, we have the following options:

Standard DB commands:

public class FibonacciOutput : ConventionOutputCommandOperation
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

You'll note that I am not specifying the parameters, those are taken implicitly from the current row.

Using DSL:

output "test", Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)"

SQL Batch operations:

public class FibonacciOutput : ConventionSqlBatchOpeartion
{
    public FibonacciOutput() : base("test")
    {
        Command = "INSERT INTO Fibonacci (Id) VALUES(@Id)";
    }
}

Using DSL: Haven't written that yet :-(

Sql Bulk Insert:

 public class FibonacciBulkInsert : SqlBulkInsertOperation
    {
        public FibonacciBulkInsert() : base("test", "Fibonacci")
        {
        }

        protected override void PrepareSchema()
        {
            Schema["id"] = typeof (int);
        }
    }

Using DSL: Haven't written that yet :-(

Files

For working with files, we have the support of the excellent FileHelpers library, which makes working with files really easy.

Reading from a file is simply:

public class UserRecord
{
    public string email;
    public int id;
    public string name;
}

public class ReadUsersFromFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using(FileEngine file = FluentFile.For<UserRecord>().From("users.txt"))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

There is 1:1 translation to Boo here, so I'll spare you that.

Writing is very similar:

public class WriteUsersToFile : AbstractOperation
{
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        FluentFile engine = FluentFile.For<UserRecord>();
        engine.HeaderText = "Id\tName\tEmail";
        using(FileEngine file = engine.To("users.txt"))
        {
            foreach (Row row in rows)
            {
                UserRecord record = new UserRecord();
                
                record.Id = (int)row["id"];
                record.Name = (string)row["name"];
                record.Email = (string)row["email"];

                file.Write(record);
            }
        }
        yield break;
    }
}

Joins

Joins are an interesting concept, and I play with them quite extensively recently. Joins in Rhino ETL are implemented as sub processes. Hash joins are very simple:

public class JoinUsersAndIds : JoinOperation
{
	protected override Row MergeRows(Row leftRow, Row rightRow)
	{
		Row row = leftRow.Clone();
		row["user_id"] = rightRow["new_id"];
		return row;
	}

	protected override void SetupJoinConditions()
	{
		InnerJoin
			.Left("id")
			.Right("old_id");
	}
}

This is just the operation, you hook it up in the process using:

Register(new JoinUsersAndIds()
         	.Left(new GenerateUsers(25000))
         	.Right(new GenerateRandomIds(15000)));

Each side is capable of accepting a full blown sub process on its own.

Nested loops joins are appropriate for the more complex cases:

public class FullJoinUsersToPeopleByEmail : NestedLoopsJoinOperation
{
    protected override bool MatchJoinCondition(Row leftRow, Row rightRow)
    {
        return LeftJoin(leftRow["email"], rightRow["email"]);
    }

	protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = new Row();
        row.Copy(leftRow);
        row["person_id"] = rightRow["id"];
        return row;
    }
}

Using DSL it looks like this:

join get_user_roles:
	left:
		input "test", Command = "SELECT id, name, email  FROM Users"
		
	right:
		nput "test", Command = "SELECT userid, roleid FROM UsersToRoles"
			
	on left.id ==  right.userid:
		row.Name = left.Name
		row.Role = right.RoleId

(A note about this syntax, this currently generate a nested loop join, I intend to make it generate an optimize version soon).

Branching

That was quite a challenge to implement, I kept missing a key point, and that tripped me for a while. Here is how send a row to several sources:

BranchingOperation split = new BranchingOperation()
	.Add(Partial
		.Register(new MultiplyByThreeOperation())
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()))
	.Add(Partial
		.Register(new Fibonacci.Bulk.FibonacciBulkInsert()));
Register(split);

(Note, the implementation is not as optimized as it could be.)

Aggregates

Well, you got to have those, don't you? Here is a simple Row Count aggregate:

public class RowCount : AbstractAggregationOperation
{
    	protected override void Accumulate(Row row, Row aggregate)
      {
            if (aggregate["count"] == null)
                aggregate["count"] = 0;

            int count = (int)aggregate["count"];
            aggregate["count"] = count + 1;
      }
}

We are called once per row, and can accumulate all the values that we want. We can use grouping to create more interesting results:

public class CostPerProductAggregation : AbstractAggregationOperation
{
    protected override void Accumulate(Row row, Row aggregate)
    {
        aggregate["name"] = row["name"];
        if(aggregate["cost"]==null)
            aggregate["cost"] = 0;
        aggregate["cost"] = ((int) aggregate["cost"]) + ((int) row["price"]);
    }

    protected override string[] GetColumnsToGroupBy()
    {
        return new string[] {"name"};
    }
}

We can also override the FinishAggregation(Row) method to complete any calculations when all the rows are completed. Rhino ETL guarantees that we will get the same aggregate row for all the rows that match the same columns, so that is taken care of.

Using DSL for that is simply:

aggregate join_product_names:
	accumulate:
		aggregate.names = [] if aggregate.names is null
		aggregate.names.Add(row.name)
	
	terminate:
		aggregate.result = string.Join(", ", aggregate.names.ToArray(string))

That is about it for now, I think. You can get the source for Rhino ETL here:

https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/trunk/rhino-etl

I plan to have a binary release one I am confident that all the notes in this post are no longer relevant.