MongoDB Change Streams - More event processing, less queue infrastructure.

What are Change Streams?

Change streams are a new way to tap into all of the data being written (or deleted) in mongo. Using change streams, you can do nifty things like triggering any reaction you want in response to very specific document changes.

For example, you have a user that registers to your website. You want to send them an email, or maybe put them on an on-boarding campaign. Maybe both? Using change streams you can hook into the live event itself - insert of a document to the Users collection - and react to that by immediately pinging your remarketing system with the details of the new user.

The flow looks something like this:

Simple Integration

Your website code only worries about registering the new user. Want to also send the new user a gift? No problem: have the listener also ping the warehouse. Want to add the user to your CRM system? You guessed it: just add a hookup.

Now you may wonder “Why this is news? Didn’t mongo always have tailable cursors? Can’t we just tail the oplog and do the same?” Sure we can. But when you tail the oplog, every mutation is going to be returned to you. Every. Single. Write.

Consider a moderate website, say 1000 users sign ups a day. That’s roughly 1 every minute or two. Not too bad. But what about the social media documents? Every tweet? Every page view you logged? Every catalog item visited? That can get pretty big. Each of these will be sent to you from the oplog, because each one is gets written into the oplog, regardless of if you need it or not. Extra work, extra load. Extra data you’re not interested in. Filtering though this is like putting a butterfly net over a firehose nozzle.

Change streams allow you to do this more efficiently. With change stream you get to customize several aspects of the stream that is returned to you:

  1. The specific collection you are interested in.
  2. The type of change you are interested in (inserts and updates only, for example).
  3. The specific parts of the changed document (if any) you want back.

These options give you great control over the source, size, and nature of the changes your listener app wants to handle. Rather than returning all writes to you and, and making you filter them out in application memory, the filtering is done at the server level. This reduces wire traffic, reduces your memory and CPU usage on the client. Further, it can save another round trip per document: if the mutation touched only one field - say, last login date - tailing the oplog would have only yielded you the content of the command mutation - the date value itself. If you needed the user’s email and name - which were not part of the command- they are not part of the oplog. Tailing the oplog would then not be enough, and you’d have to turn around and shoot another query (Joy! More I/O!) to get the extra details. With change streams, you get to include the extra fields in the returned event data by just configuring the stream to return them.

How to Consume Change Streams

Subscribing to change events depends on your particular driver support. Early beta had Node and Java driver support. I’ll focus here on Node.

There are 2 modes of consuming change streams: event driven, and aggregation pipeline. The aggregation pipeline way, is to just issue a $aggregate on a collection, with a mandatory new $changeStream pipeline operator as the first pipeline stage. You can introduce other pipeline stages after that stage, but it must be the first one. The resulting cursor would then return change items as the occur.

var coll = client.db('demo').collection('peeps');
var cursor = coll.aggregate([
{ $changeStream: {} }
]);
// .. now consume cursor as you would any

In the example above, were just saying that any write to the peeps collection in the demo database would be returned back in the cursor we opened. The cursor would remain open (baring errors or network issues or collection disappearance) so we can just process the events coming back sequentially.

Speaking of sequentially the event stream guarantees events returned would be in the order they were executed by MongoDB. The change stream system uses logical ordering that ensures you get the events in the same order mongo would have serialized them internally.

To include the full document affected, you can add the fullDocument option and a value of updateLookup. The other option is default which indicates you don’t need any extra data. Omitting the fullDocument option is equivalent to specifying the value default.

With the updateLookup value set, an event will include the queried value at some point after applying the operation that prompted this event. The subtlety here is that if there’s a high rate of change, your event may return a further future value of a document. For example, if you update a pageview.clickCount field twice at high rate, the event resulting from the first update may reflect the result of the second update. You will get 2 change notifications because 2 writes happened, and they will be reflected in order. But the lookup that brings in the current state of the document is not guaranteed to contain the first change because it is, in fact, a lookup performed after the event is queued for discovery by your change stream and not a point in time capture of the document.

var cursor = coll.aggregate([
{ $changeStream: { fullDocument: 'updateLookup' } }
]);

In the example above, the event returned from the cursor will have a field named fullDocument containing the document looked up at the server, without an additional round trip.

Speaking of event data, what else is returned?

{
"_id":{"clusterTime":{"ts":"..."},"uuid":"...","documentKey": {"_id":"..."}},
"operationType":"update",
"fullDocument":{"_id":"waldo","at":"2017-10-01","seen":"beach"}, // this is my document!
"ns":{ "db":"changeStreamDemo","coll":"peeps"},
"documentKey":{"_id":"waldo"},
"updateDescription":{
"updatedFields":{"at":"2017-10-17"},
"removedFields":[]
}
}

The change event includes a bunch of useful information:

  • An _id for the event. In case you wanted to stop and start again, save this one and ask only for changes that occurred since that id.
  • The operationType - discussed shortly.
  • The full namespace ns, including the database name and collection names.
  • documentKey pointing to the document _id subject of this event
  • fullDocument containing the document as it was just after write occurred. The word full may be a misnomer though: it depends on further projection you may add to retrieved a subset or a modified document. More about that in a bit. This field will not be present unless lookupDocument option was specified.
  • An update description, which contains the exact field value changes the values assigned in the updatedFields array, and the fields removed (think $unset) in the removedFields array.

Thus far, we just asked for any change. You can narrow down the change nature, using the various operation types during the stream setup. The options are:

  • insert
  • update
  • delete
  • replace
  • invalidate

The first 4 are self explanatory. invalidate occurs when a collection level change occurred which makes the document not available, other than delete. Think collection.drop().

To get notified only on a subset of the change types, you can request the change stream, and add a $match stage, like so:

var coll = client.db('demo').collection('peeps');
var cursor = coll.aggregate([
{ $changeStream: { fullDocument: 'updateLookup' } },
{ $match: { operationType: { $in: ['update', 'replace'] } } }
]);

Now only updates and document-replacements would be emitted back, filtered at the source.

This pipeline can also include transformations, so the aforementioned fullDocument field is really what you want to make of it.

var coll = client.db('demo').collection('peeps');
var cursor = coll.aggregate([
{ $changeStream: { fullDocument: 'updateLookup' } },
{ $match: { operationType: { $in: ['update', 'replace'] } } },
{ $project: {secret:0}}
]);

The code above will omit the secret field from the emitted event.

The $match stage can also apply other arbitrary conditions. When the updateLookup option is set, you can pretty much filter based on any document field or set of fields you want, using familiar aggregation syntax.

That was a quick rundown of consuming change streams using aggregation. But I actually like another option in the node driver: subscribing to change stream events as asynchronous events!

Here’s how that plays out:

// a filter describing the things i'm interested in
var filter = {
$match: {
$or: [{ operationType: 'update' },{ operationType: 'replace' }],
'fullDocument._id': 'waldo'
}
};
// the option to return the full document
var options = { fullDocument: 'updateLookup' };
// creating a change stream definition
var changeStream = coll.watch([filter], options);
// subscribing to events
changeStream.on('change', c => {
console.log('Waldo alert!', c)
});
changeStream.on('error', err => console.log('Oh snap!', err));

Using the code above, any update or replace to the document with _id ‘waldo’ in the collection coll will emit a change event.

My preference for this syntax is that it fits better into the modular async coding model. It also makes it easy to hook up the change function but define the handler elsewhere. This makes testing and reuse easy too. But that’s me - you do you.

Resume / Retry

In the face of network error, the driver to re-connect once automatically once. If it fails again, you can use the last _id of the change event (which will also be in the exception thrown), after you reconnect to your cluster. The responsibility for remembering where you were is on you (after the one-shot recovery the driver does for you). But both syntaxes allow you to add a resumeAfter option when requesting the stream, setting the value to the last change stream _id you successfully processed.

var lastProcessedChangeId = ...;
var options = { resumeAfter: lastProcessedChangeId, fullDocument: 'updateLookup' };

Due to the nature of large interconnected system, it is useful to design the change stream events in your application as ‘at least once’, and make any operation you trigger base on a notification idempotent. You should be able to run the same operation more than once with the same event data without adverse effect on your system.

Even when you limit the number, nature, and collection you listen to you may still end up with a large amount of changes to process. Or you may have subordinate systems that can’t handle your calls emanating from the events (think slow legacy systems). One thing you can do is deploy multiple listeners, each one taking on the same change definition, but also a partition filter on the change _id. A partition filter can be employed on any field actually, but the _id is present when you don’t ask for updateLookup too. The idea is to just run several listeners, each one with an extra field match on the modulus of the timestamp or something. If you need more capacity, you spin up more listeners and re-partition according to the number of listeners you need.

The change stream feature is currently in RC0 as of this writing, and should hit GA soon. I look forward to writing less code and incurring less I/O in the event processing component of our systems. It’s kind of nice to be able to use the same database infrastructure for all kinds of workloads, and this operational integration feature is a very welcome addition.