“Incremental” map/reduce in MongoDB isn’t

time to read 10 min | 1909 words

Rafal  an Ben Foster commented on my previous post with some ideas on how to deal with incremental updates to map/reduce indexes. Rafal said:

Actually, it's quite simple if you can 'reverse' the mapping operation (for given key find all documents matching that key): you just delete aggregate record with specified key and run incremental map-reduce on all matching documents. In today's example, you would delete the aggregate with key='oren' and then run map reduce with a query:

db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: 'oren' } });

And Ben said:

It's worth mentioning that I was able to get the MongoDB map-reduce collections updating automatically (insert/update/delete) by monitoring the MongoDB OpLog …

…and listen for new documents in the OpLog which could then be used to re-execute an incremental Map-Reduce.

And while this looks right, this actually can’t possibly work. I’ll start from Rafal’s suggestion first. He suggest just issuing the following set of commands whenever we delete something from the database:

   1: db.distinct_item_names.remove({name: 'oren' } });
   2: db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: 'oren' } });

And yes, that will actually work, as long as you are careful to never do this concurrently. Because if you do run this concurrently… well, the best you can hope is no data, but the liker scenario is data corruption.

But this actually gets better, deletes are annoying, but they are a relatively simple case to process. You have updates to deal with too. We’ll assume that we are watching the oplog to get notified when this happens. Here is an MongoDB oplog entry

   1: {
   2:   "ts": {
   3:     "t": 1286821984000,
   4:     "i": 1
   5:   },
   6:   "h": "1633487572904743924",
   7:   "op": "u",
   8:   "ns": "items",
   9:   "o2": {
  10:     "_id": "4cb35859007cc1f4f9f7f85d"
  11:   },
  12:   "o": {
  13:     "$set": {
  14:       "Name": "Eini"
  15:     }
  16:   }
  17: }

As you can see, we an update operation (op: u) on a specific document (o2._id) with the specified update (o.$set). That is really great, and it is utterly useless for our purposes. In this case, we updated the name from Oren to Eini, so we would like to be able to run this:

   1: db.distinct_item_names.remove({name: 'oren' } });
   2: db.distinct_item_names.remove({name: eini' } });
   3: db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: 'oren' } });
   4: db.items.mapReduce(map,reduce, { out: {reduce: ‘distinct_item_names’}, query: {name: eini' } });

Except that we don’t have any way to get the old value out from the oplog. And this still isn’t going to work concurrently.

But let us say that we decided to have a watcher process monitor the oplog somehow, and it will ensure no concurrency of those requests. Now you have to deal with fun issues like: “what happens if the watcher process recycle?”  How do you keep your place in the oplog (and remember, the oplog is capped, stuff you haven’t seen might be removed if they are beyond the specified size.

And… to be frank, once we have done all of that, this is still the easy part. One of the reasons that you want to do this work in the first place is to deal with large amount of data. But you cannot assume that you’ll have even distribution of the data.

One bug request that came against the RavenDB map/reduce implementation was a map/reduce index on the US Census data. That is ~300 million documents, and the index the user wanted to build was a map/reduce group by the state. You have states like California, with more than 30 million people in it, and you realize that you don’t want to have to re-do the map/reduce over the entire 30+ million documents that you have there. In RavenDB, under this scenario, you’ll have to issue about 3,073 operations, by the way. Versus the 30 millions you would need for this approach.

So yeah, “incremental” map/reduce can’t handle concurrent work, can’t handle deletes, can’t handle updates, and definitely shouldn’t be used on large data sets. And that is after you went to the trouble of setting up the watcher process, monitoring the oplog, etc.

Or, you can use RavenDB and you get a true incremental map/reduce without having to worry about any of that.