The design of concurrent subscriptions in RavenDB
One of the interesting features with RavenDB is Subscriptions. These allow you to define a query and then to subscribe to its results. When a client opens the subscription, RavenDB will send it all the documents matching the subscription query. This is an ongoing process, so you will get updates from documents that were modified after your subscription started. For example, you can ask to get: “All orders in UK”, and then use that to compute tax / import rules.
Subscriptions are ideal for a number of use cases, but backend business processing is where they shine. This is because of the other property that subscriptions have, the ability to process the subscription results reliably. In other words, a failure in process a subscription batch will not lose anything, we can simply restart the subscription. In the same manner, a server failure will simply failover to another node and things will continue processing normally. You can shut down the client for an hour or a week and when the subscription is started, it will process all the matching documents that were changed while we didn’t listen.
Subscriptions currently have one very strong requirement. There can only ever be a single subscription client open at a given point. This is done to ensure that we can process the batches reliably. A subscription client will accept a batch, process it locally and then acknowledge it to the server, which will then send the next one.
Doing things in this manner ensures that there is an obvious path of progression in how the subscription operates. However, there are scenarios where you’ll want to use concurrent clients on a single subscription. For example, if you have a lengthy computation required, and you want to have concurrent workers to parallelize the work. That is not a scenario that is currently supported, and it turns out that there are significant challenges in supporting it. I want to use this post to walk through them and consider possible options.
The first issue that we have to take into account is that the fact that subscriptions are reliable is a hugely important feature, we don’t want to lose that. This means that if we allow multiple concurrent clients at the same time, we have to have a way to handle a client going down. Right now, RavenDB keeps track of a single number to do so. You can think about it as the last modified date that was sent to the subscription client, this isn’t how it works, but it is a close enough lie that would save us the deep details.
In other words, we send a batch of documents to the client and only update our record of the “last processed” when the batch is acknowledged. This design is simple and robust, but it cannot handle the situation when we have concurrent clients that are processing batches. We have to account for a client failing to process a batch and needing to resend it. This can be sent to the same client or to another one. That means that in addition the last “last processed” value, we also need to keep a record of in flight documents that were sent in batches and hasn’t been acknowledged yet.
We keep track of our clients by holding on to the TCP connection. That means that as long as the connection is open, the batch of documents that was sent will be considered in transit state. If the client that got the batch failed, we’ll have to note (when we close the TCP connection) and then send the old batch to another client. There are issues with that, by the way, different clients may have different batch sizes, for example. If the batch we need to retry has 100 documents, but the only available client needs 10 at a time, for example.
There is another problem with this approach, however. Consider the case of a document that was sent to a client for processing. While it is being processed, it is modified again, that means that we have a problem. Do we send the document again to another client for processing? Remember that it is very likely that you’ll do something related to this document, and it can be a cause for bugs because two clients will get the same document (albeit, two different versions of it) at the same time.
In order to support concurrent clients on the same subscription, we need to handle all of these problems.
- Keep track of all the documents that were sent and haven’t been acknowledged yet.
- Keep track of all the active connections and re-schedule the documents to be sent to clients that weren’t acknowledged if the connection is broken.
- When a document is about to be sent, we need to check that it isn’t already being processed (an early version of it, rather) by another client. If that is the case, we have to wait until that document is acknowledged before allowing that document to be processed.
The latter is meant to avoid concurrency issues with handling of a single document. I think that limiting the work on a document basis is a reasonable behavior. If your model requires coordination across multiple distinct documents, that is something that you’ll need to implement directly. Implementing the “don’t send the same document to multiple clients at the same time”, on the other hand, is likely to result in better experience all around.
This post is meant to explore the design of such a feature, and as such, I would dearly love any and all feedback.
Comments
Subscription in RavenDB are modelled after a stream/journal processing that by definition doesn't support 'parallelism' in the way described in the post would: that concurrent model is more about a queue-processing of messages with at-least-once guarantee, with strict-ordering.
The only known way to scale journal/stream processing (batch) is by partitioning the stream by some partition-key that is stable across updates. In case of Raven, using the document Id is a valid partition key, once properly hashed. The number of partitions should be defined upfront when the stream (Subscription) is created and then client instances can compete in owning each given Partition; this means each client instance can receive batches from multiple partitions if he's the only client, but still on separate batches as each partition is checkpointed separately.
The number of subscription can be increated 'till we have one partition per-each document, that would coalesce in a sort-of Revision subscription but would require a lot of checkpoints (one per document,subscription) and would totally lose the reason for batching the changes. At this point Raven would become a MessageHub/Broker with support for strict-ordering thus totally overlapping with specialized solutions and the followup requirement needs.
Currently, a partitioned stream or a strict-ordered queue can be achieved using a single Subscription processor and forwarding batches to a specialized service: I'd be amazed if there would be a native need in RavenDB for Partitioning to achieve the network throughput to egress events to an external Journal or MessageHub. The existing model is simple and does work and can easily work in HA (hot-stanby) processor mode.
I'd like the addition for Partitioning the Subscription with a native solution for automatically balance the partitions across multiple subscriber instances. This would give the ability for processor to scales to a small number of partitions (10ish) and solve the current limitation without requiring to add another piece of Infrastructure in addition of Raven. More than this, a specialized solution would make more sense.
I don't think would make any sense to port this to a per-document strict-ordered sequence as that would make the clients idemponent message handlers and lose the batch ability/performances: that can be better achieved using existing Subscriptions (1 subscripber) as transactionally secure capture solution for native event-based systems like Kafka or RabbitMQ.
Andrea,
Yes, the subscription model with concurrent work is different then a single subscriber. In both cases, however, we are talking about ensuring at least once delivery.
The difference is that we no longer ensure forward progress only. Doing partitioned subscription is actually fairly easy in RavenDB. You can define 10 subscriptions, with the final number changing for eac:
That gives you the partition by document id, but it turns out that this is fairly awkward to use.
Of course, the other option for implementing concurrent clients for subscribers is to do something similar, but to assign the number of internal subscriptions to the client, so a client would be subscribed to 1 - 256 subscriptions when connected. When the next one comes in, we'll split it so the first one would cover 1 - 128 and the second would cover the 128 - 256, etc. This has the advantage of being simple extension of the previous behavior, but may result in smaller batches than would be expected and can result in jumps in the "timeline" of documents being sent.
IMHO, a case of 'wrong tool for the job'... let RavenDb shine where it fits best, and let others more suited handle all this hassle.
There are systems specifically designed for this 'competing consumer' scenario, RabbitMq, Kafka, just to name some. And it is trivial to setup a 'singleton' subscriber that just posts a message to your system of choice. Problem solved; move on...
Programmer time is too scarce and too expensive to waste it trying to reinvent the wheel.
Again, IMHO...
I'm currently dealing with this myself, and I'm trying to figure out whether to tell clients that they can only have one subscriber and then it's up to them to parallelize the work on their end or change the internal design of the system to have durable subscriptions where a subscription creates a queue internally where events land. That way you can enable competing consumers since they're pulling from a queue at that point vs a stream. I believe Apache Pulsar has a similar design: https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions
Matt,
Interesting. A key issue that we have to deal with is that a lot of users don't want to have to deal with other scenarios. They just want things to... work. And have RavenDB handle that responsibility.
Matt, Looking more deeply into the design in Pulsar. We currently do
Exclusive
and the design in this blog post is aboutshared
, although they don't have to handle the scenario of getting notified on the same document by different subscribers.IMHO, this is not a case of 'wrong tool for the job' if it becomes a native part of RavenDB.
I would love to not include other services in my systems to keep the complexity down.
RavenDB can already handle most of the requirements of a larger system, and a queueing would be a great addition.
@Anders
Point taken. Please, let me rephrase it: with the current RavenDb feature set it is the wrong tool for the job. Of course, if Oren takes the matter in his hands, then things might change. But my general interpretation of the post is that 'this is an extremely complicated issue with a lot of rough edges, so I don't think this is going to happen anytime soon'. Surely, I might be wrong...
My point is that jumping through hops (in terms of code and architecture complexity, maintainability, ...) to handle a scenario RavenDb is not designed for, does not look good to me. Specially, given that there already are proven and respected solutions out there.
Oren - yep, I get where you’re headed - I’m in the same boat. I want my solution to just work for the consumer, which is why I’m leaning towards the queue model myself. The twist here is when you have multiple subscribers wanting to get notified on the same document then the system effectively has one internal subscriber that does the fan out into N queues that represent the durable subscriptions. Ordering becomes an issue of course but there are ways to work around that - AWS FIFO SNS and SQS for instance. Anyways, mostly thinking out loud here, would love to hear your thoughts.
Kurbein,
One of the reasons I love to blog here is that it helps me talk things out and see other options. We'll likely implement this feature, but doing internal sharding of the subscriptions would be a better option for a whole lot of reasons.
It would save us the need to manage a lot of internal state and complex matters and boil down to being able to do handoffs of sharded subscription that behave the same externally.
Comment preview