Feature Design: ETL for Queues in RavenDB
RavenDB is rarely deployed in isolation, it is typically used in existing systems and is integrated into the overall system. One of the key ways by which this is promoted is the built-in ETL support that we have. RavenDB currently has ETL for Postgres, SQL Server, Oracle, MySQL, Elastic, OLAP / Date Lake, and other RavenDB instances.
We are looking into adding RavenDB ETL support to queues (RabbitMQ, Kafka, SQS, AQS, etc). That support is the topic of this blog post. I wanted to summarize my thinking about the topic and along the way gather some insight from you about what kind of shape this feature should have.
When talking about ETL to Queues, we have to deal with two distinct scenarios: receiving and sending. For the other ETL targets in RavenDB, we just send data, but for queues, given that there is a well defined interface for pulling the results, it makes sense to support receiving as well. Let’s consider what it means to be able to receive messages from a queue into RavenDB…
It means that RavenDB will listen to a queue and apply a script to it. That script will be able to insert or modify documents as a result of the message contents. For example, let’s assume that we have the queue defined as in the image on the right. We can write the following script to process messages from the queue.
The script above handles two message types. A recording of a new order or adding a line item to an existing order. It will be invoked by RavenDB whenever it receives a message from the queue. In this way, you can have RavenDB build your domain model directly from the message traffic. Of course, this is a pretty simplistic scenario, there are a lot of more interesting scenarios to explore here.
The second part is when RavenDB will be the one sending messages to the queues. Those messages, naturally, would be generated from the documents in the database. How would that work? We can write a script that would be applied to documents as they change which will output the messages to write to the queue. That is how ETL in general works in RavenDB. For queues, however, the situation is a bit more complex.
When we use ETL to sync data from RavenDB to a relational database, any update of the document will also update the data in the relational database. When we send the data to a queue, what would happen then? Well, we can’t update a message in the queue, that doesn’t make any sort of sense. So we need to consider what is the scenario we have here. One option would be to just send the message each time, every update of a document will generate a new message. Or the author of the ETL script may decide to only send it once, of course.
The scenario that I think is far more likely is to use RavenDB and ETL to Queue as part of a larger scheme. Consider the scenario where you want to use the outbox pattern. In other words, you have a transaction that needs to do a bunch of things, including sending messages on a queue. Instead of trying to create a distributed transaction or carefully coordinate things, you will use this feature. Your transaction will save a Message document alongside any other changes. That relies on RavenDB’s ACID nature to ensure that this happens in an atomic manner.
Then you will be able to utilize the ETL to Queues option to actually send that over to the actual queue, in a reliable manner.
Those two scenarios (send & receive) are the two most likely scenarios for this feature, but the point of this post is to get more feedback from you. What kind of use cases do you think that this will enable? What would you like to be able to do?
Comments
I think there's a few scenarios relating to this:
Outgoing:
For an application that publishes events to a message bus for other systems to consume, there's a strong case here for RavenDB as the outbox pattern. Most existing outbox patterns are difficult if not impossible to achieve transaction atomicity with the entity changes because it's a different infrastructure for messaging than the DB (and distributed transactions are either too expensive or too much hassle). Updating an entity and storing an event message inside RavenDB inside a single transaction is trivial and reliable. Having RavenDB then be able to forward the event message to external messaging infrastructure reliably can avoid the need for a boilerplate custom worker to subscribe to changes and forward the message onwards.
This pattern is pretty common in our ecosystem - we have a lot of event sources as microservices that broadcast their domain events, but a lot fewer consumers. Having this ETL to Rabbit or Kafka can simplify our apps and make them more reliable at the same time (the app no longer needs a direct dependency on the queue infrastructure, we get the benefits of an actually transactional outbox pattern).
Incoming:
I think there's two patterns here: application consumes the messages directly, or RavenDB consumes the messages.
Other Considerations As well as RabbitMQ and Kafka, consider CloudEvents compatibility (https://cloudevents.io/)
Trev,
The outgoing scenario - yes, that is the exact thing we have in mind. The outbox pattern then becomes really natural to use. You send a message by saving a document in the transaction.
One thing to note here is that the actual ETL isn't running in the same transaction. Which means that you may have:
Note that if this is important, the user will likely also enable revisions, so we have a snapshot of the time of the write.
As for incoming, one of the interesting scenarios we have here is when you have an event sourcing setup. So you may have systems that emit all sorts of messages, some of them are relevant and some are ignored. You can then plug RavenDB into such a bus and start accumulating the data and then use that, without having to do the integration. That isn't just about aggregation, by the way.
In a micro service world, you may have a service emitting OrderCreated events and a CustomerPortal service capturing them via this techinque.
As for CloudEvents, I'm using Kafka / RabbitMQ to refer to specific products. What we'll likely do is to support AMQP, which should give us a pretty good reach. I went over the spec, and that is interesting. Looks like using that might give us easier time to integrate, thanks.
Oren,
Re: Outbox. Got it. In terms of tx's for SaveChanges(), I'm not envisioning the ETL being driven off of changes on an entity for the outbox pattern. It's more that we'd have a dedicated, possibly transient document (expires after sending to the queue), that represents the event to be placed on the message queue. It would be saved in the same unit of work as the changes to the domain entity, so each change to the entity results in a distinct event document to be picked up by the ETL process.
Re: Incoming. For event sourcing, we'd probably want to use Kafka's replay abilities (streams) directly in the app rather than using RavenDB as the intermediary data store for the events for the application. I'm not saying it's not useful to have the option of RavenDB directly consuming the events - it fits for simpler scenarios like the data aggregation I mentioned, or if using a queue that doesn't have Kafka's abilities.
One underlying principle behind both the dedicated event document for the outbox as well as the app consuming Kafka instead of Raven db is that the domain model of the consuming service is not the domain model of the events (A "customer" entity in one domain is not the same as the "customer" entity in another domain). The validation and transformation may be trivial in a lot of cases, but I prefer that to be in app code (which is subject to tests and typically benefits from continuous delivery, whereas deployment of the scripts to the DB typically don't benefit from that as much). In other words, I prefer the boundary integration code to be in the app, not the DB, unless it's a pretty simple and static case. All that to say - the pattern for incoming you outlined is perfectly fine and nicely done, but may find limited use.
Trev,
Regarding transient documents, what I had in mind was something like this:
Basically, that means that if we successfully sent the document, we'll remove it directly.
Make sense on the incoming part, having your own logic that you can deploy / debug / version more easily is certainly a consideration
Maybe an option to choose between immediate deletion and a timed expiry after sending could potentially help with a debug experience?
Trev,
Same logic :-)
I think a few hours or a day is enough for debugging :-)
Is that user code you're showing for the ETL or the system code Raven would execute behind the ETL process?
I was assuming that the loadToQueue() and Expiry is system code (driven by ETL configuration settings) - i.e. the (L in ETL + cleanup), and that the only user code might be the E+T in ETL.
But if it's user code for the entire E+T+L+cleanup, that works too and probably allows for more complex E+T (e.g. loading related stuff), but at the same time I'd consider that too advanced for what should be the simple happy path - ETL a collection of outbox documents directly to a queue and clean them up).
Maybe consider the default happy path without any user code, but allow customization of each step in the ETL+cleanup process.
Trev,
The code here is the ETL script that you, as the user, writes. You get to make whatever logic you want there. This script is run as part of the ETL process, RavenDB will load the modified documents, your script has the chance to transform them before sending them to the queue.
The default is going to be something like: Send all documents in this collection to a this queue, no script whatsoever.
Comment preview