Raven MQ – Client API Design
There are only two topics that remains in the Raven MQ server (replication & point to point messaging), but I decided to stop for a while and focus on the client API. My experience have shown that it is so much more important than anything else to gain acceptance for the project.
One thing that I want to make clear is that this is the high level API, which has very little to do with how this is actually implemented.
The first thing to be aware of is that Raven MQ is transactional. That is, all operations either complete successfully or fail as a single unit. That makes it very easy to work with it for a set of scenarios. It is not an accident that the API is very similar to the one that you get from Rhino Service Bus or NServiceBus, although Raven MQ client API is drastically more modest in what it is trying to do.
Getting started:
var raveMQEndpoint = new RavenMQEndpoint { Url = "http://localhost:8181" }; raveMQEndpoint.Start();
Subscribing (methods):
raveMQEndpoint.Subscribe("/streams/system/notifications", (ctx, untypedMsg) => { // do something with the msg }); raveMQEndpoint.Subscribe<LoginAboutToExpire>("/streams/user/1234", (ctx, msg) => { // do something with the msg }); raveMQEndpoint.Subscribe<LoginExpired>("/streams/user/1234", (ctx, msg) => { // do something with the msg });
This allows you to handle untyped messaged, or to select specific types of messages that will be handled from the stream (ignoring messages not of this type). I’ll discuss the ctx parameter at a later stage, for now, you can ignore it. What you can’t see here is that the Subscribe methods here returns an IDisposable instance, which allows you to remove the subscription. Useful for temporary subscriptions, which is something that is pretty common for the scenarios that we see Raven MQ used for.
Subscribing (classes):
raveMQEndpoint.Subscribe("/streams/user/1234", () => new LoginExpiredConsumer()); raveMQEndpoint.Subscribe("/streams/user/1234", mefContainer);
Instead of registering a single method, you can register a factory method, or a MEF container, both of which will create a consumer class for handling the messages.
Serialization:
Raven MQ doesn’t care about the serialization format, you can it messages using whatever format you like, but the client API used JSON/BSON to store the data.
Sending messages:
Remember that I talked about the ctx parameter? The RavenMQEndpoint doesn’t offer a Send() method, that is handled by the ctx paratemer, which stands for Context, obviously. The idea is quite simple, we want to make message sending transactional, so we always use a context to send them, and only if the context completed successfully can we truly consume the message and send all the messages to the server. You can think of the Context as the Raven MQ transaction.
For sending messages outside of processing an existing message, you can use:
ravenMQEndpoint.Transaction(ctx=> ctx.Send("/queues/customers/1234", updateCustomerAddress));
This gives us a very easy way of scoping multiple messages in a single transaction without awkward APIs.
Thoughts?
Comments
Any IObservable features similar to ReactiveQueue?
http://rxcontrib.codeplex.com/
Looks tough.... ;)
where can I get this? really impress me about REST style interaction!!
Cool!
I'm waiting to play with! I'm curious about the client api capabilities in a javascript environment, and I'm courios also to take a look to the code.
Just one thing, I don't really feel comfortable with the message sending, because it seems a little bit ... difficult (sorry but words that cames in mi mind in italian could be translated just with "difficult")
As far as I see, you tend to close the whole message processing in an action to have it wrapped with a transaction. Am I getting it right?
What about passed MEF container. Is is asked about all of the listeners/handlers of the specific message? If so, the DI in your handlers is saved!:D
To modify an old internet meme, "This Thread Is Useless Without Sourcecode!"
Stop teasing me and let me under the hood already!
Great to see some code snippets escaping, now on to some comments.
You are using the concept of an endpoint, but since the queues are specified after the endpoint, aren't you really creating a connection or a channel to the queue service. A multiplex channel to RavenMQ seems to make sense from a naming perspective.
On the subscribe semantics, could we combine the arguments into a pair of types, one for untyped and one for typed messages? My message handling methods could be:
void Handle(RavenMQ.Message msg)
{
}
Or for a typed message
void Handle(RavenMQ.Message <accountchanged msg)
{
}
That way additional methods could be added to the Message or Message <t interfaces, and extension methods could be used to extend/simplify the syntax for things like publishing events, responding to requests, etc.
Just some ideas...
Gah, it stripped my Message<T> tag from the second Handle method.
Chris,
Thanks, I'll probably apply both suggestions
These transactions then, you are just stating that a send is successful when you know it will be readable by other subscriptions, or are you getting into some distributed stuff?
Frank,
If you have local tx, you can easily get distributed tx.
Comment preview