Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,524
|
Comments: 51,148
Privacy Policy · Terms
filter by tags archive
time to read 3 min | 489 words

I am currently doing some work with messaging, and I am using both NServiceBus and Mass Transit, to get a good feel on what best match what I actually need. One of the things that they both have is XML configuration. In Mass Transit case, it looks like this:

<facility id="masstransit">

  <bus id="local" endpoint="msmq://localhost/test_servicebus">
    <subscriptionCache mode="local" />
    <subscriptionService endpoint ="msmq://localhost/mt_pubsub" />
    <managementService endpoint ="msmq://localhost/mt_dashboard" />
  </bus>

  <bus id="distributed" endpoint-"msmq://localhost/test_remoteservicebus">
    <subscriptionCache mode="distributed">
      <servers>
        <server>192.168.0.1:11211</server>
        <server>192.168.0.2:11211</server>
      </servers>
    </subscription>
  </bus>

  <transports>
    <transport>MassTransit.ServiceBus.MSMQ.MsmqEndpoint, MassTransit.ServiceBus.MSMQ</transport>
    <transport>MassTransit.ServiceBus.NMS.NmsEndpoint, MassTransit.ServiceBus.NMS</transport>
  </transports>
</facility>

And in NServiceBus case, it is:

<MsmqTransportConfig
  InputQueue="messagebus"
  ErrorQueue="error"
  NumberOfWorkerThreads="2"
  MaxRetries="5"
/>

<UnicastBusConfig DistributorControlAddress="distributorcontrolbus" DistributorDataAddress="distributordatabus">
  <MessageEndpointMappings>
  </MessageEndpointMappings>
</UnicastBusConfig>

<MsmqSubscriptionStorageConfig Queue="subscriptions" />

My first thought when I saw this was "Yuck! How can I get away from this?" Imagine my surprise when I discovered that it is actually not that easy to do.

In NServiceBus case, it is literally not possible without replacing much of the configuration backbone. It is only when I actually stopped my gag reflex and actually read the XML that I realized what is going on here. In both cases, what they are specifying are administrator level settings. I actually want to make it as hard as possible to hard code them.

time to read 3 min | 428 words

imageOn Friday, I had an encounter with the sewers at my house. On Call team has came and fixed the issue, which was a plugged pipe somewhere down the street very fast, so no harm was done (although checking the sewers just after taking a shower is not my idea of fun).

That did, however, let me learn a bit about the way sewers are constructed.

Broadly, the sewer is based on the idea of pipes and pools ( at least, those are the Hebrew terms, translated ). The idea is that the sewer flows in the pipes, toward where ever it goes. But if we have the pipes, why do we need the pools as well?

The pools are there for capacity control. They help to ensure that even if there is increase demand for the sewer, it won't back fire on you (literally, yuck!) when used.

If the downstream pipe cannot accept the sewer that we have locally, it is gathered in the pool, and will trickle downstream based on whatever "bandwidth" it will get.

Usually, you have several such pools, each of which can take quite a bit of sewer water, so you tend not to have issues with the sewer unless it is a real problem. Reduced capacity for the sewer is not noticed, because the pools absorbe the spike in usage and moderate it.

In my house, we had the following issue:

image

 

Three pools down from the actual drain (about 100 meters from my house, actually), we finally found the culprit, a semi plugged pipe that wasn't draining properly. The interesting thing here is that it is likely that it wasn't functioning correctly for a long time (weeks, at least).

Now, last I checked, this blog was mainly technical stuff, and while I am sure that there are Sewer Geek somewhere, I am not part of that particular subculture. Why am I telling you that?

Because there are a lot of parallels with software. Specifically, with building software that can withstand the harsh environment called production. The idea of pools for the sewers closely match the idea of queued messaging for communication, for example.

And I have seen application survive any number of transient failures as a result of this type of architecture.

So, this was a trip in the sewers, hope you held your noses.

time to read 12 min | 2398 words

One of the most common issues when people are building frameworks and applications that rely on a container is that they are not giving the container enough to do. Basically, they use the container to create some components, but they are doing a lot of things that the container could do for them outside of the container.

Note: Code for this post can be in the Scratch Pad.

NServiceBus and Mass Transit are good examples of that. I detailed some of the issues that I had with Mass Transit a while ago. Udi and I talked about this situation with NServiceBus a few days ago, and this is my attempt to figure out a better model for configuring NSB. Let us start from what we have right now.

We have XML configuration in app.config:

<MsmqTransportConfig InputQueue="messagebus" ErrorQueue="error" NumberOfWorkerThreads="1" MaxRetries="5"
/>

<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="">
  <MessageEndpointMappings>
      <add Messages="Messages" Endpoint="messagebus" />
  </MessageEndpointMappings>
</UnicastBusConfig>

<MsmqSubscriptionStorageConfig Queue="subscriptions" />

And we have code to initialize the bus:

new ConfigMsmqSubscriptionStorage(builder);
NServiceBus.Serializers.Configure.BinarySerializer.With(builder);
new ConfigMsmqTransport(builder)
	.IsTransactional(true)
	.PurgeOnStartup(false);
new ConfigUnicastBus(builder)
	.ImpersonateSender(false)
	.SetMessageHandlersFromAssembliesInOrder(
		typeof(RequestDataMessageHandler).Assembly
		);
IBus bus = builder.Build<IBus>();
bus.Start();

The ConfigXyz objects are there to configure the bus itself inside the builder (the container used in the sample.

My first step was to take this and move it to Windsor, and with no XML config. Which gave me this:

// configure bus
container.Register(
	Component.For<IBuilder>()
		.ImplementedBy<WindsorBuilderAdapter>(),
	Component.For<IMessageSerializer>()
		.ImplementedBy<BinaryMessageSerializer>(),
	Component.For<ITransport>()
		.ImplementedBy<MsmqTransport>()
		.DependsOn(new
		{
			InputQueue = "messagebus",
			NumberOfWorkerThreads = 1,
			ErrorQueue = "error",
			MaxRetries = 5,
			PurgeOnStartup = true,
			IsTransactional = false
		}),
	Component.For<ISubscriptionStorage>()
		.ImplementedBy<MsmqSubscriptionStorage>()
		.DependsOn(
		new
		{
			Queue = "subscriptions"
		}
		),
	Component.For<IBus>()
		.ImplementedBy<UnicastBus>()
		.DependsOn(new
		{
			ImpersonateSender = false,
			MessageOwners = new Hashtable
			{
				{"Messages", "messagebus"}
			},
			MessageHandlerAssemblies = new[]
			{
				typeof (RequestDataMessageHandler).Assembly
			}
		})
	);

// configure handlers
container.Register(
	// yuck, we are registering concrete type!
	Component.For<RequestDataMessageHandler>()
	);


var bus = container.Resolve<IBus>();
bus.Start();

There are a few things that you would notice here. We have a lot more code, we hard code the configuration all over the place, you either need to understand Windsor or you have to copy/paste this, no XML (yeah!).

Some of this is good (no XML), the rest... need some work. If we will consider the fact that this is more or less standard bus configuration (configure bus on top of MSMQ), we will see that there is quite a lot we can do here to encapsulate the entire mess into a mechanism that would be much easier to work with.

In Windsor, packaging functionality is done using facilities. A facility is an extension to the container that contains certain behavior. It can be as simple as packaging up registration for several components into a single facility or it can be as complex as proxies and runtime component selections.

Let us start with what the least common denominator. A Windsor facility with XML configuration. The configuration I came up with is:

<configuration>
	<facilities>
		<facility
		   id="NServiceBusFacility"
		   type="Windsor.Infrastructure.NServiceBusFacility, Windsor.Infrastructure"
		   useBinarySerialization="true"
		   subsciptionQueue="subscriptions">
			<transport inputQueue="messagebus" errorQueue ="error"/>
			<bus impersonateSender="false">
				<message name="Messages" destination="messagebus"/>
				<handler name="Server"/>
			</bus>
		</facility>
	</facilities>

	<components>
		<component id="RequestDataMessageHandler" 
			type="Server.RequestDataMessageHandler, Server"/>
	</components>

</configuration>

And the code to make this happen is here (minus utility methods that I am not showing):

public class NServiceBusFacility : AbstractFacility
{
	public bool UseXmlSerialization { get; set; }
	public bool UseBinarySerialization { get; set; }

	protected override void Init()
	{
		UseXmlSerialization = FacilityConfig.Value("UseXmlSerialization");
		UseBinarySerialization = FacilityConfig.Value("UseBinarySerialization");

		Kernel.Register(
			Component.For<IBuilder>()
				.ImplementedBy<WindsorBuilderAdapter>()
			);


		RegisterTransport();
		RegisterSerializer();
		RegisterSubscription();
		RegisterBus();
	}

	private void RegisterBus()
	{
		var bus = FacilityConfig.Children["bus"];
		if (bus == null)
			throw new InvalidOperationException("bus is a mandatory element");
		var messageOwners = new Hashtable();
		var assemblies = new List<Assembly>();
		foreach (var element in bus.Children)
		{
			if (element.Name == "message")
			{
				AddMessageDestination(element, messageOwners);
			}
			else if (element.Name == "handler")
			{
				AddHandlerAssebmly(element, assemblies);
			}
			else
			{
				throw new InvalidOperationException("Unknown element in bus: " + element.Name);
			}
		}
		Kernel.Register(
			Component.For<IBus>()
				.ImplementedBy<UnicastBus>()
				.DependsOn(new
				{
					MessageOwners = messageOwners,
					MessageHandlerAssemblies = assemblies
				})
			);
	}

	private static void AddHandlerAssebmly(IConfiguration handler, ICollection<Assembly> assemblies)
	{
		string assemblyString = handler.Attributes["name"];
		if (string.IsNullOrEmpty(assemblyString))
			throw new InvalidOperationException("name attribute is mandatory in handler element");
		assemblies.Add(Assembly.Load(assemblyString));
	}

	private static void AddMessageDestination(IConfiguration message, IDictionary messageOwners)
	{
		string messsageName = message.Attributes["name"];
		if (string.IsNullOrEmpty(messsageName))
			throw new InvalidOperationException("message must have a name");
		string destination = message.Attributes["destination"];
		if (string.IsNullOrEmpty(destination))
			throw new InvalidOperationException("message must have a destination");

		messageOwners[messsageName] = destination;
	}

	private void RegisterSubscription()
	{
		string attribute = FacilityConfig.Attributes["subsciptionQueue"];
		if (attribute == null)
			throw new InvalidOperationException("subsciptionQueue is a mandatory attribute");
		Kernel.Register(
			Component.For<ISubscriptionStorage>()
				.ImplementedBy<MsmqSubscriptionStorage>()
				.Parameters(
				Parameter.ForKey("Queue").Eq(attribute)
				)
			);
	}

	private void RegisterTransport()
	{
		IConfiguration transport = FacilityConfig.Children["transport"];
		if (transport == null)
			throw new InvalidOperationException("transport is mandatory element");
		Kernel.Register(
			Component.For<ITransport>()
				.ImplementedBy<MsmqTransport>()
				.Parameters(

				// mandatory
				transport.Parameter("InputQueue"),
				transport.Parameter("ErrorQueue"),

				// optional
				transport.Parameter("numberOfWorkerThreads", "1"),
				transport.Parameter("MaxRetries", "5"),
				transport.Parameter("PurgeOnStartup", "false"),
				transport.Parameter("IsTransactional", "false")

				)
			);
	}

	private void RegisterSerializer()
	{
		AssertValidSerializationSettings();

		if (UseBinarySerialization)
		{
			Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<BinaryMessageSerializer>()
				);
		}

		if (UseXmlSerialization)
		{
			Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<XmlMessageSerializer>()
				);
		}
	}

	private void AssertValidSerializationSettings()
	{
		if ((UseXmlSerialization && UseBinarySerialization) || (!UseXmlSerialization && !UseBinarySerialization))
		{
			throw new InvalidOperationException("Must define either XML or Binary, not both.");
		}
	}
}

I am not happy with this yet, the facility has a lot of code there, and we still have XML, but we have very little configuration and the code to use this is now:

IWindsorContainer container = new WindsorContainer("windsor.config");

var bus = container.Resolve<IBus>();
bus.Start();

Which is much better than both versions. It also doesn't require me to recompile to modify the configuration.

Hold the press, what about administrator configuration?!

One of the major emphasis that NServiceBus has in its configuration API it the explicit distinction it makes between developer level configuration (dependencies, which transport you are using, transactions, who handles what, etc) and administrator level configuration (queue names, mostly).

In the configuration above we have no such separation. Problem, isn't it?

Again, we can use the container itself as a way to deal with this. First, we will define a configuration file, which will contain the following text:

<configuration>
	<properties>
		<subscriptionsQueue>subscriptions</subscriptionsQueue>
		<inputQueue>messagebus</inputQueue>
		<errorQueue>error</errorQueue>
	</properties>
</configuration>

And now in the configuration file itself we will include this configuration file, and refer to those values:

<configuration>
	<include uri="file://Configuration.config"/>
	<facilities>
		<facility
		    id="NServiceBusFacility"
		    type="Windsor.Infrastructure.NServiceBusFacility, Windsor.Infrastructure"
		    useBinarySerialization="true"
		    subsciptionQueue="#{subscriptionsQueue}">
			<transport inputQueue="#{inputQueue}" errorQueue ="#{errorQueue}"/>
			<bus impersonateSender="false">
				<message name="Messages" destination="messagebus"/>
				<handler name="Server"/>
			</bus>
		</facility>
	</facilities>

	<components>
		<component id="RequestDataMessageHandler" 
			 type="Server.RequestDataMessageHandler, Server"/>
	</components>

</configuration>

And just like that, we got ourselves a nice dual configuration, one for the administrators and one for the developers.

I am still not happy with this, because I have XML and a lot of code to deal with this XML nonsense, but we did drop down to a very simple XML configuration with very little time.

Let us see what is going to happen if we will use Binsor...

Since we don't want to have replication of the XML syntax (we can do much better without the limitations of XML), we will start from scratch, and define a new facility.

Note: While writing this several improvements to Binsor itself occurred to me, so this is certainly something that can be improved.

The approach for building configuration language using Binsor is fairly simple. Create an object graph that represent your configuration, and just call it from the Binsor script.

We start by defining the configuration model:

public enum SerializationFormat
{
	Xml,
	Binary
}

public class Transport
{
	public Transport()
	{
		//default values for optional params
		NumberOfWorkerThreads = 1;
		MaxRetries = 5;
		PurgeOnStartup = false;
		IsTransactional = false;
	}
	public string InputQueue { get; set; }
	public string ErrorQueue { get; set; }
	public int NumberOfWorkerThreads { get; set; }
	public int MaxRetries { get; set; }
	public bool PurgeOnStartup { get; set; }
	public bool IsTransactional { get; set; }
}

public class Bus
{
	public IDictionary MessageOwners { get; set; }
	public Assembly[] MessageHandlerAssemblies { get; set; }
}

public class UnicastBus : Bus { }

As you can see, this is about as simple as it can get.

Then we define the facility itself:

public class NServiceBusFacility_Binsor : AbstractFacility
{
	public SerializationFormat SerializationFormat { get; set; }
	public string SubsciptionQueue { get; set; }
	public Transport Transport { get; set; }
	public Bus Bus { get; set; }

	public NServiceBusFacility_Binsor(
		SerializationFormat serializationFormat,
		string subsciptionQueue,
		Transport transport,
		Bus bus)
	{
		SerializationFormat = serializationFormat;
		SubsciptionQueue = subsciptionQueue;
		Transport = transport;
		Bus = bus;
	}

	protected override void Init()
	{
		Kernel.Register(
			Component.For<IBuilder>()
				.ImplementedBy<WindsorBuilderAdapter>()
			);

		Kernel.Register(
			Component.For<ITransport>()
				.ImplementedBy<MsmqTransport>()
				.DependsOn(Transport)
			);

		switch (SerializationFormat)
		{
			case SerializationFormat.Binary:
				Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<BinaryMessageSerializer>()
				);
				break;
			case SerializationFormat.Xml:
				Kernel.Register(
				Component.For<IMessageSerializer>()
					.ImplementedBy<XmlMessageSerializer>()
				);
				break;
			default:
				throw new NotSupportedException("Serialization format " + SerializationFormat + 
" is not supported"); } Kernel.Register( Component.For<ISubscriptionStorage>() .ImplementedBy<MsmqSubscriptionStorage>() .Parameters( Parameter.ForKey("Queue").Eq(SubsciptionQueue) ) ); Kernel.Register( Component.For<IBus>() .ImplementedBy<NServiceBus.Unicast.UnicastBus>() .DependsOn(Bus) ); } }

It is significantly simpler than the XML configuration based one. And now we can get to the configuration itself:

import System.Reflection
import Windsor.Infrastructure
import Server

facility NServiceBusFacility_Binsor:
	serializationFormat = SerializationFormat.Binary
	transport = Transport ( 
		InputQueue: "messagebus",
		ErrorQueue: "errors"
	)
	subsciptionQueue = "subscriptions"
	bus = UnicastBus (
			MessageOwners : {
				"Messages" : "messagebus"
			},
			MessageHandlerAssemblies : ( Assembly.Load("Server"), )
	)
	
component RequestDataMessageHandler

Wait! What about administrator configuration? Now that we are using a script to configure our application, it is even more important to separate the administrative configuration from the application configuration.

We will take the exact same approach as we did before. Creating a separate file for administration purposes. In order to do so in a way that gives the admin a nice syntax for configuration, we will define a configuration model:

public class MyConfiguration
{
	public static string InputQueue { get; set; }
	public static string ErrorQueue { get; set; }
	public static string SubscriptionsQueue { get; set; }
}

Now we can create the default AdminConfiguration.boo file:

import Windsor.Infrastructure # namespace of MyConfiguration

MyConfiguration.SubscriptionsQueue = "subscriptions"
MyConfiguration.InputQueue = "messagebus"
MyConfiguration.ErrorQueue  = "error"

And our Windsor.boo file is now:

import System.Reflection
import Windsor.Infrastructure
import Server
import file from AdminConfiguration.boo

AdminConfiguration().Run() # execute admin configuration

facility NServiceBusFacility_Binsor:
	serializationFormat = SerializationFormat.Binary
	transport = Transport ( 
		InputQueue: MyConfiguration.InputQueue,
		ErrorQueue: MyConfiguration.ErrorQueue
	)
	subsciptionQueue = MyConfiguration.SubscriptionsQueue
	bus = UnicastBus (
			MessageOwners : {
				"Messages" : "messagebus"
			},
			MessageHandlerAssemblies : ( Assembly.Load("Server"), )
	)
	
component RequestDataMessageHandler

Now we don't have any XML involved, but the format that we have is suspiciously similar to the way we worked when we had XML. So, except from a small reduction in the configuration complexity, what did we gain?

We have a full fledged programming language for our configuration purposes. We can now apply rules to our configuration, make logic based decisions, etc.

As a simple example, instead of having to hard code the message owners and handlers, we can scan the application directory for matching assemblies. Want to add a new handler, drop it into the directory, done. This is a really powerful concept, and I am using this extensively in my applications.

Note: Code for this post can be in the Scratch Pad.

time to read 1 min | 156 words

Several months ago I wrote about how I think you should design your services. The key criteria was that they have to have a batch mode enabled.

Afterward, I put together a screen cast that goes through all the stages that led me to that stage.

Davy Brion has taken this approach a bit further and posted all the infrastructure bits that are required to make this work as well as what you need to actually make the API almost as nice to use as the non batched version.

The service API is here and the client API is here.

About the only thing that I would strive to improve there would be the need to explicitly register request & replies. I would try to get something convention based there. Maybe something like Request<TRespose>, and then have IHandler<TRequest> and route the whole thing through the container again.

time to read 4 min | 655 words

Yesterday I reviewed Mass Transit itself, unfortunately, I missed the samples folder, which means that I didn't get critical information about how to use Mass Transit. I am going to do the same review for the samples, trying to get a feeling for how using Mass Transit feels.

Audit

This seems to be only partially done, it apparently should show how we can fire off events using Mass Transit. Again, I am impress by how simple the Cunsumes<TMsg>.All makes the code, but there is nothing there as of now.

Heavy Load

Load testing, not something that I am particularly interested at right this moment.

Publish Subscribe

This example more or less show how you can use Pub / Sub. Nothing really interesting there, but it has cleared my mind about a few things. Specifically, I was bothered by the tie of the service bus to a specific end point. What I missed was that this was the entry point end point, and that communication can (and does) flow to other end points. Once I grasped that, it became much easier to understand what was going on there.

What really bothers me, however, is this:

<component id="serviceBus"
		   lifestyle="singleton"
		   service="MassTransit.ServiceBus.IServiceBus, MassTransit.ServiceBus"
		   type="MassTransit.ServiceBus.ServiceBus, MassTransit.ServiceBus">
	<parameters>
		<endpointToListenOn>${serverEndpoint}</endpointToListenOn>
		<!-- Setter Injection -->
		<SubscriptionCache>${subscriptionCache}</SubscriptionCache>
	</parameters>
</component>
<component id="serverEndpoint"
		   lifestyle="singleton"
		   service="MassTransit.ServiceBus.IEndpoint, MassTransit.ServiceBus"
		   type="MassTransit.ServiceBus.MSMQ.MsmqEndpoint, MassTransit.ServiceBus.MSMQ">
	<parameters>
		<uriString>msmq://localhost/test_server</uriString>
	</parameters>
</component>

That sound you just heard was me, fleeing in terror. The idea that I would need to write so much XML is abhorrent to me. My idea syntax would be something like this:

facility MassTransitFacility, startable = true:
	standardCustomersServiceBus = "msmq://localhost/standard_customers"
	enterpriseCustomersServiceBus = "msmq://localhost/enterprise_customers"
extend OrdersController:
	standardCustomerPublisher = @standardCustomersServiceBus
	enterpriseCustomersPublisher = @enterpriseCustomersServiceBus

And this will allow us to define both service bus and the end points in the container, and start them as soon as they are ready. No need to do anything in the actual application code beyond just creating the container.

Web Request Reply

This is very interesting, because it shows how you can handle request reply scenarios, which are very common in data handling scenarios. In this case, the code sample show an async request / reply, using MonoRail's async actions:

public IAsyncResult BeginAsync(string requestText)
{
	_request = _bus.Request()
		.From(this)
		.WithCallback(ControllerContext.Async.Callback, ControllerContext.Async.State)
		.Send(new RequestMessage(CorrelationId, requestText));

	return _request;
}

public void EndAsync()
{
	PropertyBag.Add("responseText", msg.Text + " (and my response)");
	RenderView("Default");
}

public void Consume(ResponseMessage message)
{
	msg = message;
	_request.Complete();
}

I find it very nice. I did wonder at first how the standard EndSend() is involved here, but then I realized that it is not. It doesn't make sense for message passing semantics. When you pass the this pointer to the From() method, it will register that for accepting messages from the bus. I do wonder about such things as timeouts and memory leaks. From my observation, a failure in handling the response of a message would leak the handler instance.

To conclude, after reviewing the samples, I feel that I have a much better understanding of Mass Transit, which is good. It hasn't shifted my thinking, the way reading NServiceBus did, but that is probably because I have already read NServiceBus code.

time to read 10 min | 1914 words

A while ago I took a look at NServiceBus and its Distributor, after catching this post talking about Mass Transit, I decided that I really need to take an additional look at this project. Mass Transit is very similar in purpose to NServiceBus, and it uses a very similar approach. However, it has a radically different style. A lot of the concepts are shared between both projects.

As usual, I am going to post here my thoughts as I read through the code.

Opening the project is a bit overwhelming, although not as much as with NServiceBus. We have 22 projects (NServiceBus had 47 when I checked that). I tried to think how to express my initial impression, and finally settled on using the old adage about a picture worth a thousand words.

image

Looks like there is a lot of interesting concepts there. One thing that really stand out is that 8 of those projects are test projects. That make me feel much better about the project.

Build

The build projects handle creating the appropriate queues, there is a single class in all three projects, so I am will just ignore them.

Dashboard

The dashboard application is using MonoRail, Windsor & Brail. This make me happy, even though the dashboard is obviously in its initial stages. One point that I dislike is the use of XML configuration for Windsor.

Deferment Service

I had to look up in the dictionary to figure out what deferment means, but take a single look at the actual code made it obvious:

public interface IDefermentService
{
    int Defer(object msg, TimeSpan amountOfTimeToDefer);
}

There is no usable implementation for this, and there aren't enough tests to make it possible to understand how this is supposed to be used, either.

So far, nothing really interesting, but I am used to getting mixed results when following the Top to Bottom Review style that I like.

Host

The host is a window service that can host Mass Transit. In essence, it takes a configurator, which is an object that can return hosted services, and use that to start itself up, spinning up those services. A hosted service is a way to register and unregister to incoming messages.

There are a lot of things there that I am vague about. There is a lot of code to deal with arguments, argument maps, etc. I am not sure what it is doing there, and what it is trying to do. But I haven't read the tests yet.

The tests contains this sample, which I am not sure that I like. It looks like the service is dealing with too many things, from subscribing to messages to hosting the bus itself through message handling. This also cause my thread safety spider sense to tingle quite heavily.

I am not really interested in the configuration, from a very cursory glance, it seems to me that I would move all of that to the container and deal with it there.

Patterns

I am always suspicious when I am seeing patterns as a topic of itself. It reminds me of the application that reach GoF-complete status by implementing all of the GoF patterns. That said, I really should look inside before talking.

After reading the code, I can say that there isn't a lot of useful code there. It seems to be intended to be a repository for implementing patterns for Enterprise Integration Patterns. Something that I find very interesting is this piece of code:

public class HeartbeatMonitor :
	Consumes<Heartbeat>.All
{
	private IServiceBus _bus;

	public HeartbeatMonitor(IServiceBus bus)
	{
		_bus = bus;
	}

	public void Consume(Heartbeat message)
	{
		// do something with the heartbeat
	}
}

The Consumes<Heartbeat>.All is a really nice tagging idea. It wouldn't have occur to me, but it is a very natural way to specify what is going on there.

Subscriptions

Subscriptions in service buses play a critical role in ensuring that message would reach their destination as they should.

What appears to be a default implementation is a subscription service that is backed by a database to store those subscriptions. This looks like a really simple way of handling that, but it is not really interesting.

What is interesting is the distributed subscription cache, it is using Memcached in order to store the information. There are some issues there that bother me, however. There is minor issues with multi threaded access to that may cause issue, mainly with dictionaries used to store state. What really bothers me, however, is that a cache is not a good place to store information. By definition you can't rely on it staying there.

I haven't seen so far how the persistent storage get to update the cache. I think that I am missing something here. I assume that there are some messages flying around here that I am not seeing.

Yes, there is, and it is in the SubscriptionService, which I haven't looked deeply at yet.

I really like the way the tests are organized, by the way.

Transports

Mass Transit supports MSMQ and in the process of adding support to ActiveMQ. It is interesting to compare the two, but before I do that, I want to take a look at the MSMQ implementation. There isn't anything interesting with the Send implementation, but the Receive is interesting.

It is using GetMessageEnumerator2() to go through the messages and decide if there is any message that is worth dispatching. NServiceBus use a Peek & Read approach, instead, and then handle the message internally. There is a potential resource leak there, because the enumerator isn't using a using statement to free the enumerator in the case of exception.

The ActiveMQ implementation has just been started, from the looks of things, but just from the short code sample that there is there I can tell that it is smells heavily of Java. This is natural, since ActiveMQ is written in Java, but it is amusing to be able to recognize the style.

Service Bus

Finally! There are things to be said against reading top to bottom when the last project contains all the interesting meat. So far, I have only dealt with various infrastructure stuff, not really interesting, to tell you the truth.  Let us dive into the code...

There are formatters, which serialize an message instance to the wire. I was amused to find out that Mass Transit supports binary, XML and, of all things, JSON. I am not sure who is going to consume that, but I hope it is not running in a browser.

It looks like the Health Monitoring that appeared in the patterns section has been merged to the main line. Now we start to see how this is used. Here is an example of a service that just send a healthy heartbeat:

public class HealthClient : IHostedService
{
	readonly IServiceBus _bus;
	readonly Timer _timer = new Timer(3000);

	public HealthClient(IServiceBus bus)
	{
		_bus = bus;
		_timer.Elapsed += delegate
		{
			_bus.Publish(new Heartbeat(3, _bus.Endpoint.Uri));
		};
	}

	public void Start()
	{
		_timer.Start();
	}

	public void Stop()
	{
		_timer.Stop();
	}

	public void Dispose()
	{
		_timer.Dispose();
	}
}

I like this. There isn't a single line that I would consider wasted.

However, there are a few things that concerns me about this sample. First and foremost, the bus seems to be tied directly to a specific end point. I am pretty sure that I don't like this. The problem here is that this means that I now need to configure a bus per end point, and I find myself reluctant to do that. But let us deal with this issue later, in the mean time, I want to read the rest of the health monitoring part.

Having done that, I am not sure that I like what I am seeing. Let us take a look at the other side of the health monitoring:

public class HealthService : IHostedService
{
    private readonly IServiceBus _bus;

    public HealthService(IServiceBus bus)
    {
        _bus = bus;
    }

    public void Start()
    {
        _bus.AddComponent<HeartbeatMonitor>();
        _bus.AddComponent<Investigator>();
        _bus.AddComponent<Reporter>();
    }

    public void Stop()
    {
        _bus.RemoveComponent<Reporter>();
        _bus.RemoveComponent<Investigator>();
        _bus.RemoveComponent<HeartbeatMonitor>();
    }
}

This is the heart of what is bothering me. Why do we have to register and unregister those components? A component is a term from IoC containers, not from service buses. Using this approach give you much more control over what is going on (you can shut down services very easily this way), but I dislike it. I would route message handlers through the container and be done with it. If I wanted to dynamic start / shutdown of services, it is easy enough to do without this.

The way Mass Transit handles subscriptions is interesting. And I strongly suggest taking a look. When the service start working, it ask to get all the current subscriptions, and hold it locally. There are provisions in place to handle subscription updates, but they haven't been implemented yet.

Moving on from subscriptions, probably the most beautiful part of Mass Transit is this:

public class Consumes<TMessage> where TMessage : class
{
	public interface All
	{
		void Consume(TMessage message);
	}

	public interface For<TCorrelationId> : All, CorrelatedBy<TCorrelationId>
	{
	}

	public interface Selected : All
	{
		bool Accept(TMessage message);
	}
}

Comparing the syntax that this enable vs. the syntax that NServiceBus need... I really like this. And I am very annoyed that I have not thought about this myself.

And not it is 05:38 AM my time, and while the Mass Transit codebase is interesting, is isn't quite that much of a page turner.

I took a look at the Service Bus implementation, and found more or less what I expected. Especially frustrating is that I can see that there are a lot of features that are not used in Mass Transit itself, they are there to be exposed to users of the library.

The problem with that is that there is no reference for seeing how they are used. A sample application or two to show off what it is doing and how it is doing it would be most welcome. Especially since it would give better sense for how this works as a whole.

Update: I am an idiot, there are samples, I just missed them.

I am still unclear on the ServiceBus / Endpoint one to one mapping. I don't like this as it stand, but I am missing something. Mass Transit is 0.1, and it shows. It looks promising, and there are some interesting ideas there that I would like to see followed up.

I would like to see a service bus that doesn't try to be a container as well (NServiceBus & Mass Transit both share this fault to some extent), but rather build on top of the container and just provided the pub / sub services.

Sigh, I really don't want to have to write one.

time to read 4 min | 624 words

Okay, NServiceBus Distributor deserve its own post. Broadly, the way it works is fairly simple.

Here is a simplified diagram, showing the workers and their queues, as well as the distributor and its queue. In here they are shown running on separate machines, but they could also be on the different processes.

image

Now, let us zoom into the distributor a bit, shall we?

image

Hm, we actually have two queues for the distributor, one is for workers, reporting for work (on the left), the second is applicative messages, which needs to be processed.

The reason that I think this is beautiful is quite simple, you submit work to the distributor queue, which forward it to one of the workers. So far, pretty standard stuff. The fun part starts when you talk about managing the workers.

On startup, each of the workers will send T notifications to the distributor, where T is the number of threads it is configured to use (yes, workers are threads in a machine, not a machine). When the distributor send a message to a worker, it also take it out from the available list. When the worker is done, it tells the distributor that it is ready again, at which point it become available to more work.

Very elegant solution. It is even more elegant when you look at the code to handle that:

private void messageBusTransport_TransportMessageReceived(
        object sender,
        TransportMessageReceivedEventArgs e)
{
    if (disabled)
        this.Rollback();

    string destination = this.workerManager.PopAvailableWorker();

    if (destination == null)
        this.Rollback();
    else
    {
        logger.Debug("Sending message to: " + destination);
        this.messageBusTransport.Send(e.Message, destination);
    }
}

Notice what happens if we don't have an available worker. We simply rollback our current action and move on. We will try again in a short while, and hopefully then we will have a worker to dispatch to.

What about failure scenarios?

Well, at most a worker can "lose" a single message, since if it crashed, it will not report itself as available. If a machine crashes, then we might lose a bunch of messages (all the messages currently worked on by the workers on that machine), but it doesn't hurt the overall system stability. When that machine comes back online, it will immediately starts to process those messages again.

Hm, there is actually an issue here with this scenario, since the workers will start working on their existing messages, but at the same time will report that they are ready for work. This just means that they will have work already queued by the time they are finished (and then they would report they are available again, of course). In general, I am okay with that.

What about the distributor itself? Again, crashing the distributor is generally not an issue. We are talking about using a durable transport here, so unprocessed messages will be saved and received when the distributor comes back again.

At the moment, I am not sure what happens if the distributor goes down for a lengthy period of time.

time to read 6 min | 1190 words

I have been planning to check this out for a while now, but I seem to never get the time. Today I made time for this. As usual in such cases, this post is a stream of consciousness. You get my thoughts as I am thinking them.

Before trying to understand anything about NServiceBus, you probably need to understand a bit about distributed systems and messaging. NServiceBus is structured around a bus (into which you can put messages) and message handlers (which handles messages for you). On top of this very simple idea, everything else is built. This forces you to think in a way that makes a lot of sense in distributed, high scalability, scenarios. I got a lot of the concepts behind NServiceBus because I read Programming Erlang, in which the same concepts are explained beautifully.

Just to note, I am probably not going to make a lot of sense to people who are trying to understand NServiceBus. I am going to go over the code here. Which is how I learn new stuff.

Let us get to the review. The first thing that I noticed was this:

image

AllProjects is really all projects. That include samples, tests, extras, etc.

Looking into the NServiceBus project itself made me much calmer, I can handle this:

image 

In fact, most of the projects contains very few files (one or two in many cases). This is probably done to allow you to pick & choose your dependencies, without having to deal with unrelated stuff.

BaseMessageHandler is a simple base class that simply define the Bus property.

This, however, is annoying.

image

I don't like to have camelCase exposed in my classes. Yes, it is just an issue of style, I know... The bracing styles in NServiceBus has a strange feel to it, but I have R#, and it can fix that ;-)

This is where the fun really starts:

image

Publish and Subscribe looks remarkably like their equivalent in Retlang. Same model, after all.

I am not really sure about the difference between Publish and Send yet. And SendLocal doesn't really make any sort of sense yet.

I am not happy with Return(errorCode) either. I don't like using codes for errors, it reminds me when I had to interface to a mainframe, and 12 was corrupted file and 14 corrupted header, etc. Reading further into the code reveals that typical usage of this is with enum

HandleCurrentMessageLater is interesting, and I have seen some samples on Udi's blog on where it is used (handle the message after another transaction has finished running). Not sure what DoNotContinueDispatchingCurrentMessageToHandlers purpose is yet.

ICallback interface is interesting:

image

Need to check it out in more depth, when I figure it what you are registering for and the usage for that.

IMessage is a marker interface, nothing more.

There are a couple of interesting attributes as well:

  • Recoverable - this is a durable message, and it should survive transient failures en route.
  • TimeToBeReceived - what is the lifetime of this message. I am not really sure that this is a good way to specify this. I would assume that things like that are much more dynamic than the code. As a simple example, a burst scenario may cause high latency for processing work. I have a post dedicated to this issue here.

ReadyMessage (on the Messages.cs file) is interesting. I followed the references, but I don't think that I understand what is going on when it is handled.

Moving on, NServiceBus.Saga:

image

A saga is a way to handle a series of messages that are related to one another.

image

A good example may be submitting a order. It needs a manager authorization before it can be done. So you send a message for it to be approved, and you will only continue processing the order after you got the authorization message. Udi has a sample here.

The reason for the large numbers of projects, and the reason many of them contains only a single class or two becomes clear when you track down the classes that implements this bit:

image

I don't like either of the two implementers that are provided (not my style, basically), but it is going to be trivial to supply a new implementation that I would be happy with. I really like that ability. For that matter, the Persister.Deserialize() method is really fun to read. Really cool way to solve the problem.

Overall, I like the idea of Saga very much. It seems to me like there aren't really better alternative to Erlang's recieve keyword in C#.

Speaking of replacing the building blocks, NServiceBus use the following interface to abstract the container. You can usually guess what is the inspiration for the container abstraction. In this case, this is Spring.

image

For using Windsor, we would need to write a simple adapter. (Basically mapping Build() calls to Resolve() calls).

The only really interesting bit is with BuildAndDispatch. This is how NServiceBus deals with making calls to generic interfaces without knowing up front what the generic parameter would be. I don't like it, but I don't see any other choice.

NServiceBus.Testing.

image

Not much to say about it, except that it is a great example of how much of a difference building an explicit interface for testing can make.

TimeoutMessageHandler made my head hurt. Go on, take a look. Now figure out what it does... The code is deceptively simple, because you have to think in terms of reentrancy.

I figured out what the ICallback is for, it is the result of the Send()

bus.Send(command).Register(cb, extraData);

Okay, that just about finishes with the core services that NServiceBus has to offer. Next, the infrastructure components are Transport and Bus, there is also the concept of a distributor, but I am not sure what this is, looks like the manager in a grid.

Transport is how NServiceBus abstract the details of the network communication.

Bus manages the messages, correlation, translation between transport messages and application message, dispatching to message handlers, etc.

There is also subscription manager and subscription storage, which I understand conceptually, but not in detail.

The distributor section of NServiceBus is a thing of beauty. Which I'll describe in the next post.

FUTURE POSTS

  1. Querying over the current time in RavenDB - 3 hours from now

There are posts all the way to Oct 11, 2024

RECENT SERIES

  1. Challenge (75):
    01 Jul 2024 - Efficient snapshotable state
  2. Recording (14):
    19 Jun 2024 - Building a Database Engine in C# & .NET
  3. re (33):
    28 May 2024 - Secure Drop protocol
  4. Meta Blog (2):
    23 Jan 2024 - I'm a JS Developer now
  5. Production Postmortem (51):
    12 Dec 2023 - The Spawn of Denial of Service
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}