Change Streams

As a new feature in MongoDB 3.6, Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Change stream is available for replica sets and sharded clusters that use WiredTiger storage engine and replica set protocol version 1 (pv1). Change streams can also be used on deployments which employ MongoDB’s encryption-at-rest feature.

Change streams can notify your application of all writes to documents (including deletes) and provide access to all available information as changes occur, without polling that can introduce delays, incur higher overhead (due to the database being regularly checked even if nothing has changed), and lead to missed opportunities.

Characteristics of change streams

Targeted changes

Changes can be filtered to provide relevant and targeted changes to listening applications. As an example, filters can be on operation type or fields within the document.

Resumablility

Each change stream response includes a resume token. In cases where the connection between the application and the database is temporarily lost, the application can send the last resume token it received and change streams will pick up right where the application left off.

Total ordering

MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster. Applications will always receive changes in the order they were applied to the database.

Durability

Change streams only include majority-committed changes. This means that every change seen by listening applications is durable in failure scenarios such as a new primary being elected.

Security

Change streams are secure – users are only able to create change streams on collections to which they have been granted read access.

Ease of use

Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format.

Idempotence

All changes are transformed into a format that’s safe to apply multiple times. Listening applications can use a resume token from any prior change stream event, not just the most recent one, because reapplying operations is safe and will reach the same consistent state.

Watch Collection/Database/Deployment

You can open change streams against:

Target Description
A collection You can open a change stream cursor for a single collection (except system collections, or any collections in the admin, local, and config databases).
A database Starting in MongoDB 4.0, you can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.
A deployment Starting in MongoDB 4.0, you can open a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases except for admin, local, and config.

Change stream is only available if "majority" read concern support is enabled (default).

Open A Change Stream

For a replica set, you can open change stream for any of the data-bearing members.

For a sharded cluster, you must issue the open change stream operation against the mongos.

The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:

  • The cursor is explicitly closed.
  • An invalidate event occurs.
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.

The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

To retrieve the data change event notifications, iterate the change stream cursor.

Modify Change Stream Output

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

  • $match
  • $project
  • $addFields
  • $replaceRoot
  • $redact
MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
    Document.parse("{'fullDocument.username': 'alice'}"),
    Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters any operations where the username is alice, or operations where the operationType is delete.

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

Lookup Full Document for Update Operation

By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

Resume a Change Stream

Change streams are resumable by specifying a resumeAfter token when opening the cursor. For the resumeAfter token, use the _id value of the change stream event document. Passing the _id value to the change stream attempts to resume notifications starting after the specified operation.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

Event Notification

Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.

If an operation is associated with a multi-document transactions, the change event document includes the txnNumber and the lsid.

See Change Events for all possible fields that a change stream response document can have.

results matching ""

    No results matching ""