Putting the container to work: Refactoring NServiceBus configuration

time to read 12 min | 2398 words

One of the most common issues when people are building frameworks and applications that rely on a container is that they are not giving the container enough to do. Basically, they use the container to create some components, but they are doing a lot of things that the container could do for them outside of the container.

Note: Code for this post can be in the Scratch Pad.

NServiceBus and Mass Transit are good examples of that. I detailed some of the issues that I had with Mass Transit a while ago. Udi and I talked about this situation with NServiceBus a few days ago, and this is my attempt to figure out a better model for configuring NSB. Let us start from what we have right now.

We have XML configuration in app.config:

<MsmqTransportConfig InputQueue="messagebus" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5"
/>

<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="">
  <MessageEndpointMappings>
      <add Messages="Messages" Endpoint="messagebus" />
  </MessageEndpointMappings>
</UnicastBusConfig>

<MsmqSubscriptionStorageConfig Queue="subscriptions" />

And we have code to initialize the bus:

new ConfigMsmqSubscriptionStorage(builder);
NServiceBus.Serializers.Configure.BinarySerializer.With(builder);
new ConfigMsmqTransport(builder)
	.IsTransactional(true)
	.PurgeOnStartup(false);
new ConfigUnicastBus(builder)
	.ImpersonateSender(false)
	.SetMessageHandlersFromAssembliesInOrder(
		typeof(RequestDataMessageHandler).Assembly
		);
IBus bus = builder.Build<IBus>();
bus.Start();

The ConfigXyz objects are there to configure the bus itself inside the builder (the container used in the sample.

My first step was to take this and move it to Windsor, and with no XML config. Which gave me this:

// configure bus
container.Register(
	Component.For<IBuilder>()
		.ImplementedBy<WindsorBuilderAdapter>(),
	Component.For<IMessageSerializer>()
		.ImplementedBy<BinaryMessageSerializer>(),
	Component.For<ITransport>()
		.ImplementedBy<MsmqTransport>()
		.DependsOn(new
		{
			InputQueue = "messagebus",
			NumberOfWorkerThreads = 1,
			ErrorQueue = "error",
			MaxRetries = 5,
			PurgeOnStartup = true,
			IsTransactional = false
		}),
	Component.For<ISubscriptionStorage>()
		.ImplementedBy<MsmqSubscriptionStorage>()
		.DependsOn(
		new
		{
			Queue = "subscriptions"
		}
		),
	Component.For<IBus>()
		.ImplementedBy<UnicastBus>()
		.DependsOn(new
		{
			ImpersonateSender = false,
			MessageOwners = new Hashtable
			{
				{"Messages", "messagebus"}
			},
			MessageHandlerAssemblies = new[]
			{
				typeof (RequestDataMessageHandler).Assembly
			}
		})
	);

// configure handlers
container.Register(
	// yuck, we are registering concrete type!
	Component.For<RequestDataMessageHandler>()
	);


var bus = container.Resolve<IBus>();
bus.Start();

There are a few things that you would notice here. We have a lot more code, we hard code the configuration all over the place, you either need to understand Windsor or you have to copy/paste this, no XML (yeah!).

Some of this is good (no XML), the rest... need some work. If we will consider the fact that this is more or less standard bus configuration (configure bus on top of MSMQ), we will see that there is quite a lot we can do here to encapsulate the entire mess into a mechanism that would be much easier to work with.

In Windsor, packaging functionality is done using facilities. A facility is an extension to the container that contains certain behavior. It can be as simple as packaging up registration for several components into a single facility or it can be as complex as proxies and runtime component selections.

Let us start with what the least common denominator. A Windsor facility with XML configuration. The configuration I came up with is:

<configuration>
	<facilities>
		<facility
		   id="NServiceBusFacility"
		   type="Windsor.Infrastructure.NServiceBusFacility, Windsor.Infrastructure"
		   useBinarySerialization="true"
		   subsciptionQueue="subscriptions">
			<transport inputQueue="messagebus" errorQueue ="error"/>
			<bus impersonateSender="false">
				<message name="Messages" destination="messagebus"/>
				<handler name="Server"/>
			</bus>
		</facility>
	</facilities>

	<components>
		<component id="RequestDataMessageHandler" 
			type="Server.RequestDataMessageHandler, Server"/>
	</components>

</configuration>

And the code to make this happen is here (minus utility methods that I am not showing):

public class NServiceBusFacility : AbstractFacility
{
	public bool UseXmlSerialization { get; set; }
	public bool UseBinarySerialization { get; set; }

	protected override void Init()
	{
		UseXmlSerialization = FacilityConfig.Value("UseXmlSerialization");
		UseBinarySerialization = FacilityConfig.Value("UseBinarySerialization");

		Kernel.Register(
			Component.For<IBuilder>()
				.ImplementedBy<WindsorBuilderAdapter>()
			);


		RegisterTransport();
		RegisterSerializer();
		RegisterSubscription();
		RegisterBus();
	}

	private void RegisterBus()
	{
		var bus = FacilityConfig.Children["bus"];
		if (bus == null)
			throw new InvalidOperationException("bus is a mandatory element");
		var messageOwners = new Hashtable();
		var assemblies = new List<Assembly>();
		foreach (var element in bus.Children)
		{
			if (element.Name == "message")
			{
				AddMessageDestination(element, messageOwners);
			}
			else if (element.Name == "handler")
			{
				AddHandlerAssebmly(element, assemblies);
			}
			else
			{
				throw new InvalidOperationException("Unknown element in bus: " + element.Name);
			}
		}
		Kernel.Register(
			Component.For<IBus>()
				.ImplementedBy<UnicastBus>()
				.DependsOn(new
				{
					MessageOwners = messageOwners,
					MessageHandlerAssemblies = assemblies
				})
			);
	}

	private static void AddHandlerAssebmly(IConfiguration handler, ICollection<Assembly> assemblies)
	{
		string assemblyString = handler.Attributes["name"];
		if (string.IsNullOrEmpty(assemblyString))
			throw new InvalidOperationException("name attribute is mandatory in handler element");
		assemblies.Add(Assembly.Load(assemblyString));
	}

	private static void AddMessageDestination(IConfiguration message, IDictionary messageOwners)
	{
		string messsageName = message.Attributes["name"];
		if (string.IsNullOrEmpty(messsageName))
			throw new InvalidOperationException("message must have a name");
		string destination = message.Attributes["destination"];
		if (string.IsNullOrEmpty(destination))
			throw new InvalidOperationException("message must have a destination");

		messageOwners[messsageName] = destination;
	}

	private void RegisterSubscription()
	{
		string attribute = FacilityConfig.Attributes["subsciptionQueue"];
		if (attribute == null)
			throw new InvalidOperationException("subsciptionQueue is a mandatory attribute");
		Kernel.Register(
			Component.For<ISubscriptionStorage>()
				.ImplementedBy<MsmqSubscriptionStorage>()
				.Parameters(
				Parameter.ForKey("Queue").Eq(attribute)
				)
			);
	}

	private void RegisterTransport()
	{
		IConfiguration transport = FacilityConfig.Children["transport"];
		if (transport == null)
			throw new InvalidOperationException("transport is mandatory element");
		Kernel.Register(
			Component.For<ITransport>()
				.ImplementedBy<MsmqTransport>()
				.Parameters(

				// mandatory
				transport.Parameter("InputQueue"),
				transport.Parameter("ErrorQueue"),

				// optional
				transport.Parameter("numberOfWorkerThreads", "1"),
				transport.Parameter("MaxRetries", "5"),
				transport.Parameter("PurgeOnStartup", "false"),
				transport.Parameter("IsTransactional", "false")

				)
			);
	}

	private void RegisterSerializer()
	{
		AssertValidSerializationSettings();

		if (UseBinarySerialization)
		{
			Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<BinaryMessageSerializer>()
				);
		}

		if (UseXmlSerialization)
		{
			Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<XmlMessageSerializer>()
				);
		}
	}

	private void AssertValidSerializationSettings()
	{
		if ((UseXmlSerialization && UseBinarySerialization) || (!UseXmlSerialization && !UseBinarySerialization))
		{
			throw new InvalidOperationException("Must define either XML or Binary, not both.");
		}
	}
}

I am not happy with this yet, the facility has a lot of code there, and we still have XML, but we have very little configuration and the code to use this is now:

IWindsorContainer container = new WindsorContainer("windsor.config");

var bus = container.Resolve<IBus>();
bus.Start();

Which is much better than both versions. It also doesn't require me to recompile to modify the configuration.

Hold the press, what about administrator configuration?!

One of the major emphasis that NServiceBus has in its configuration API it the explicit distinction it makes between developer level configuration (dependencies, which transport you are using, transactions, who handles what, etc) and administrator level configuration (queue names, mostly).

In the configuration above we have no such separation. Problem, isn't it?

Again, we can use the container itself as a way to deal with this. First, we will define a configuration file, which will contain the following text:

<configuration>
	<properties>
		<subscriptionsQueue>subscriptions</subscriptionsQueue>
		<inputQueue>messagebus</inputQueue>
		<errorQueue>error</errorQueue>
	</properties>
</configuration>

And now in the configuration file itself we will include this configuration file, and refer to those values:

<configuration>
	<include uri="file://Configuration.config"/>
	<facilities>
		<facility
		    id="NServiceBusFacility"
		    type="Windsor.Infrastructure.NServiceBusFacility, Windsor.Infrastructure"
		    useBinarySerialization="true"
		    subsciptionQueue="#{subscriptionsQueue}">
			<transport inputQueue="#{inputQueue}" errorQueue ="#{errorQueue}"/>
			<bus impersonateSender="false">
				<message name="Messages" destination="messagebus"/>
				<handler name="Server"/>
			</bus>
		</facility>
	</facilities>

	<components>
		<component id="RequestDataMessageHandler" 
			 type="Server.RequestDataMessageHandler, Server"/>
	</components>

</configuration>

And just like that, we got ourselves a nice dual configuration, one for the administrators and one for the developers.

I am still not happy with this, because I have XML and a lot of code to deal with this XML nonsense, but we did drop down to a very simple XML configuration with very little time.

Let us see what is going to happen if we will use Binsor...

Since we don't want to have replication of the XML syntax (we can do much better without the limitations of XML), we will start from scratch, and define a new facility.

Note: While writing this several improvements to Binsor itself occurred to me, so this is certainly something that can be improved.

The approach for building configuration language using Binsor is fairly simple. Create an object graph that represent your configuration, and just call it from the Binsor script.

We start by defining the configuration model:

public enum SerializationFormat
{
	Xml,
	Binary
}

public class Transport
{
	public Transport()
	{
		//default values for optional params
		NumberOfWorkerThreads = 1;
		MaxRetries = 5;
		PurgeOnStartup = false;
		IsTransactional = false;
	}
	public string InputQueue { get; set; }
	public string ErrorQueue { get; set; }
	public int NumberOfWorkerThreads { get; set; }
	public int MaxRetries { get; set; }
	public bool PurgeOnStartup { get; set; }
	public bool IsTransactional { get; set; }
}

public class Bus
{
	public IDictionary MessageOwners { get; set; }
	public Assembly[] MessageHandlerAssemblies { get; set; }
}

public class UnicastBus : Bus { }

As you can see, this is about as simple as it can get.

Then we define the facility itself:

public class NServiceBusFacility_Binsor : AbstractFacility
{
	public SerializationFormat SerializationFormat { get; set; }
	public string SubsciptionQueue { get; set; }
	public Transport Transport { get; set; }
	public Bus Bus { get; set; }

	public NServiceBusFacility_Binsor(
		SerializationFormat serializationFormat,
		string subsciptionQueue,
		Transport transport,
		Bus bus)
	{
		SerializationFormat = serializationFormat;
		SubsciptionQueue = subsciptionQueue;
		Transport = transport;
		Bus = bus;
	}

	protected override void Init()
	{
		Kernel.Register(
			Component.For<IBuilder>()
				.ImplementedBy<WindsorBuilderAdapter>()
			);

		Kernel.Register(
			Component.For<ITransport>()
				.ImplementedBy<MsmqTransport>()
				.DependsOn(Transport)
			);

		switch (SerializationFormat)
		{
			case SerializationFormat.Binary:
				Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<BinaryMessageSerializer>()
				);
				break;
			case SerializationFormat.Xml:
				Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<XmlMessageSerializer>()
				);
				break;
			default:
				throw new NotSupportedException("Serialization format " + SerializationFormat + 
" is not supported"); } Kernel.Register( Component.For<ISubscriptionStorage>() .ImplementedBy<MsmqSubscriptionStorage>() .Parameters( Parameter.ForKey("Queue").Eq(SubsciptionQueue) ) ); Kernel.Register( Component.For<IBus>() .ImplementedBy<NServiceBus.Unicast.UnicastBus>() .DependsOn(Bus) ); } }

It is significantly simpler than the XML configuration based one. And now we can get to the configuration itself:

import System.Reflection
import Windsor.Infrastructure
import Server

facility NServiceBusFacility_Binsor:
	serializationFormat = SerializationFormat.Binary
	transport = Transport ( 
		InputQueue: "messagebus",
		ErrorQueue: "errors"
	)
	subsciptionQueue = "subscriptions"
	bus = UnicastBus (
			MessageOwners : {
				"Messages" : "messagebus"
			},
			MessageHandlerAssemblies : ( Assembly.Load("Server"), )
	)
	
component RequestDataMessageHandler

Wait! What about administrator configuration? Now that we are using a script to configure our application, it is even more important to separate the administrative configuration from the application configuration.

We will take the exact same approach as we did before. Creating a separate file for administration purposes. In order to do so in a way that gives the admin a nice syntax for configuration, we will define a configuration model:

public class MyConfiguration
{
	public static string InputQueue { get; set; }
	public static string ErrorQueue { get; set; }
	public static string SubscriptionsQueue { get; set; }
}

Now we can create the default AdminConfiguration.boo file:

import Windsor.Infrastructure # namespace of MyConfiguration

MyConfiguration.SubscriptionsQueue = "subscriptions"
MyConfiguration.InputQueue = "messagebus"
MyConfiguration.ErrorQueue  = "error"

And our Windsor.boo file is now:

import System.Reflection
import Windsor.Infrastructure
import Server
import file from AdminConfiguration.boo

AdminConfiguration().Run() # execute admin configuration

facility NServiceBusFacility_Binsor:
	serializationFormat = SerializationFormat.Binary
	transport = Transport ( 
		InputQueue: MyConfiguration.InputQueue,
		ErrorQueue: MyConfiguration.ErrorQueue
	)
	subsciptionQueue = MyConfiguration.SubscriptionsQueue
	bus = UnicastBus (
			MessageOwners : {
				"Messages" : "messagebus"
			},
			MessageHandlerAssemblies : ( Assembly.Load("Server"), )
	)
	
component RequestDataMessageHandler

Now we don't have any XML involved, but the format that we have is suspiciously similar to the way we worked when we had XML. So, except from a small reduction in the configuration complexity, what did we gain?

We have a full fledged programming language for our configuration purposes. We can now apply rules to our configuration, make logic based decisions, etc.

As a simple example, instead of having to hard code the message owners and handlers, we can scan the application directory for matching assemblies. Want to add a new handler, drop it into the directory, done. This is a really powerful concept, and I am using this extensively in my applications.

Note: Code for this post can be in the Scratch Pad.