A while ago I took a look at NServiceBus and its Distributor, after catching this post talking about Mass Transit, I decided that I really need to take an additional look at this project. Mass Transit is very similar in purpose to NServiceBus, and it uses a very similar approach. However, it has a radically different style. A lot of the concepts are shared between both projects.
As usual, I am going to post here my thoughts as I read through the code.
Opening the project is a bit overwhelming, although not as much as with NServiceBus. We have 22 projects (NServiceBus had 47 when I checked that). I tried to think how to express my initial impression, and finally settled on using the old adage about a picture worth a thousand words.
Looks like there is a lot of interesting concepts there. One thing that really stand out is that 8 of those projects are test projects. That make me feel much better about the project.
Build
The build projects handle creating the appropriate queues, there is a single class in all three projects, so I am will just ignore them.
Dashboard
The dashboard application is using MonoRail, Windsor & Brail. This make me happy, even though the dashboard is obviously in its initial stages. One point that I dislike is the use of XML configuration for Windsor.
Deferment Service
I had to look up in the dictionary to figure out what deferment means, but take a single look at the actual code made it obvious:
public interface IDefermentService
{
int Defer(object msg, TimeSpan amountOfTimeToDefer);
}
There is no usable implementation for this, and there aren't enough tests to make it possible to understand how this is supposed to be used, either.
So far, nothing really interesting, but I am used to getting mixed results when following the Top to Bottom Review style that I like.
Host
The host is a window service that can host Mass Transit. In essence, it takes a configurator, which is an object that can return hosted services, and use that to start itself up, spinning up those services. A hosted service is a way to register and unregister to incoming messages.
There are a lot of things there that I am vague about. There is a lot of code to deal with arguments, argument maps, etc. I am not sure what it is doing there, and what it is trying to do. But I haven't read the tests yet.
The tests contains this sample, which I am not sure that I like. It looks like the service is dealing with too many things, from subscribing to messages to hosting the bus itself through message handling. This also cause my thread safety spider sense to tingle quite heavily.
I am not really interested in the configuration, from a very cursory glance, it seems to me that I would move all of that to the container and deal with it there.
Patterns
I am always suspicious when I am seeing patterns as a topic of itself. It reminds me of the application that reach GoF-complete status by implementing all of the GoF patterns. That said, I really should look inside before talking.
After reading the code, I can say that there isn't a lot of useful code there. It seems to be intended to be a repository for implementing patterns for Enterprise Integration Patterns. Something that I find very interesting is this piece of code:
public class HeartbeatMonitor :
Consumes<Heartbeat>.All
{
private IServiceBus _bus;
public HeartbeatMonitor(IServiceBus bus)
{
_bus = bus;
}
public void Consume(Heartbeat message)
{
// do something with the heartbeat
}
}
The Consumes<Heartbeat>.All is a really nice tagging idea. It wouldn't have occur to me, but it is a very natural way to specify what is going on there.
Subscriptions
Subscriptions in service buses play a critical role in ensuring that message would reach their destination as they should.
What appears to be a default implementation is a subscription service that is backed by a database to store those subscriptions. This looks like a really simple way of handling that, but it is not really interesting.
What is interesting is the distributed subscription cache, it is using Memcached in order to store the information. There are some issues there that bother me, however. There is minor issues with multi threaded access to that may cause issue, mainly with dictionaries used to store state. What really bothers me, however, is that a cache is not a good place to store information. By definition you can't rely on it staying there.
I haven't seen so far how the persistent storage get to update the cache. I think that I am missing something here. I assume that there are some messages flying around here that I am not seeing.
Yes, there is, and it is in the SubscriptionService, which I haven't looked deeply at yet.
I really like the way the tests are organized, by the way.
Transports
Mass Transit supports MSMQ and in the process of adding support to ActiveMQ. It is interesting to compare the two, but before I do that, I want to take a look at the MSMQ implementation. There isn't anything interesting with the Send implementation, but the Receive is interesting.
It is using GetMessageEnumerator2() to go through the messages and decide if there is any message that is worth dispatching. NServiceBus use a Peek & Read approach, instead, and then handle the message internally. There is a potential resource leak there, because the enumerator isn't using a using statement to free the enumerator in the case of exception.
The ActiveMQ implementation has just been started, from the looks of things, but just from the short code sample that there is there I can tell that it is smells heavily of Java. This is natural, since ActiveMQ is written in Java, but it is amusing to be able to recognize the style.
Service Bus
Finally! There are things to be said against reading top to bottom when the last project contains all the interesting meat. So far, I have only dealt with various infrastructure stuff, not really interesting, to tell you the truth. Let us dive into the code...
There are formatters, which serialize an message instance to the wire. I was amused to find out that Mass Transit supports binary, XML and, of all things, JSON. I am not sure who is going to consume that, but I hope it is not running in a browser.
It looks like the Health Monitoring that appeared in the patterns section has been merged to the main line. Now we start to see how this is used. Here is an example of a service that just send a healthy heartbeat:
public class HealthClient : IHostedService
{
readonly IServiceBus _bus;
readonly Timer _timer = new Timer(3000);
public HealthClient(IServiceBus bus)
{
_bus = bus;
_timer.Elapsed += delegate
{
_bus.Publish(new Heartbeat(3, _bus.Endpoint.Uri));
};
}
public void Start()
{
_timer.Start();
}
public void Stop()
{
_timer.Stop();
}
public void Dispose()
{
_timer.Dispose();
}
}
I like this. There isn't a single line that I would consider wasted.
However, there are a few things that concerns me about this sample. First and foremost, the bus seems to be tied directly to a specific end point. I am pretty sure that I don't like this. The problem here is that this means that I now need to configure a bus per end point, and I find myself reluctant to do that. But let us deal with this issue later, in the mean time, I want to read the rest of the health monitoring part.
Having done that, I am not sure that I like what I am seeing. Let us take a look at the other side of the health monitoring:
public class HealthService : IHostedService
{
private readonly IServiceBus _bus;
public HealthService(IServiceBus bus)
{
_bus = bus;
}
public void Start()
{
_bus.AddComponent<HeartbeatMonitor>();
_bus.AddComponent<Investigator>();
_bus.AddComponent<Reporter>();
}
public void Stop()
{
_bus.RemoveComponent<Reporter>();
_bus.RemoveComponent<Investigator>();
_bus.RemoveComponent<HeartbeatMonitor>();
}
}
This is the heart of what is bothering me. Why do we have to register and unregister those components? A component is a term from IoC containers, not from service buses. Using this approach give you much more control over what is going on (you can shut down services very easily this way), but I dislike it. I would route message handlers through the container and be done with it. If I wanted to dynamic start / shutdown of services, it is easy enough to do without this.
The way Mass Transit handles subscriptions is interesting. And I strongly suggest taking a look. When the service start working, it ask to get all the current subscriptions, and hold it locally. There are provisions in place to handle subscription updates, but they haven't been implemented yet.
Moving on from subscriptions, probably the most beautiful part of Mass Transit is this:
public class Consumes<TMessage> where TMessage : class
{
public interface All
{
void Consume(TMessage message);
}
public interface For<TCorrelationId> : All, CorrelatedBy<TCorrelationId>
{
}
public interface Selected : All
{
bool Accept(TMessage message);
}
}
Comparing the syntax that this enable vs. the syntax that NServiceBus need... I really like this. And I am very annoyed that I have not thought about this myself.
And not it is 05:38 AM my time, and while the Mass Transit codebase is interesting, is isn't quite that much of a page turner.
I took a look at the Service Bus implementation, and found more or less what I expected. Especially frustrating is that I can see that there are a lot of features that are not used in Mass Transit itself, they are there to be exposed to users of the library.
The problem with that is that there is no reference for seeing how they are used. A sample application or two to show off what it is doing and how it is doing it would be most welcome. Especially since it would give better sense for how this works as a whole.
Update: I am an idiot, there are samples, I just missed them.
I am still unclear on the ServiceBus / Endpoint one to one mapping. I don't like this as it stand, but I am missing something. Mass Transit is 0.1, and it shows. It looks promising, and there are some interesting ideas there that I would like to see followed up.
I would like to see a service bus that doesn't try to be a container as well (NServiceBus & Mass Transit both share this fault to some extent), but rather build on top of the container and just provided the pub / sub services.
Sigh, I really don't want to have to write one.