Change Streams

MongoDB 3.6 introduces a new $changeStream aggregation pipeline operator.

Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection API includes a new watch method. The ChangeStreamIterable sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.

Code

vim src/main/java/com/admatic/ChangeStreamSamples.java
package com.admatic;

import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

public final class ChangeStreamSamples {

    private ChangeStreamSamples() {
    }

    public static void main(final String[] args) {
        Logger mongoLogger = Logger.getLogger("org.mongodb.driver");
        mongoLogger.setLevel(Level.SEVERE);

        MongoClient mongoClient;

        if (args.length == 0) {
            // connect to the local database server
            mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019");
        } else {
            mongoClient = MongoClients.create(args[0]);
        }

        // Select the MongoDB database.
        MongoDatabase database = mongoClient.getDatabase("testChangeStreams");
        database.drop();
        sleep();

        // Select the collection to query.
        MongoCollection<Document> collection = database.getCollection("documents");

        /**
         * Example 1
         * Create a simple change stream against an existing collection.
         */
        System.out.println("1. Initial document from the Change Stream:");

        // Create the change stream cursor.
        MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();

        // Insert a test document into the collection.
        collection.insertOne(Document.parse("{username: 'alice123', name: 'Alice'}"));
        ChangeStreamDocument<Document> next = cursor.next();
        System.out.println(next);
        cursor.close();
        sleep();

        /**
         * Example 2
         * Create a change stream with 'lookup' option enabled.
         * The test document will be returned with a full version of the updated document.
         */
        System.out.println("2. Document from the Change Stream, with lookup enabled:");

        // Create the change stream cursor.
        cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();

        // Update the test document.
        collection.updateOne(Document.parse("{username: 'alice123'}"), Document.parse("{$set : { email: 'alice@example.com'}}"));

        // Block until the next result is returned
        next = cursor.next();
        System.out.println(next);
        cursor.close();
        sleep();

        /**
         * Example 3
         * Create a change stream with 'lookup' option using a $match and ($redact or $project) stage.
         */
        System.out.println("3. Document from the Change Stream, with lookup enabled, matching `update` operations only: ");

        // Insert some dummy data.
        collection.insertMany(asList(Document.parse("{updateMe: 1}"), Document.parse("{replaceMe: 1}")));

        // Create $match pipeline stage.
        List<Bson> pipeline = singletonList(
                Aggregates.match(
                        Filters.or(
                                Document.parse("{'fullDocument.username': 'alice123'}"),
                                Filters.in("operationType", asList("update", "replace", "delete"))
                        )
                )
        );

        // Create the change stream cursor with $match.
        cursor = collection.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();

        // Forward to the end of the change stream
        next = cursor.tryNext();

        // Update the test document.
        collection.updateOne(Filters.eq("updateMe", 1), Updates.set("updated", true));
        next = cursor.next();
        System.out.println(format("Update operationType: %s %n %s", next.getUpdateDescription(), next));

        // Replace the test document.
        collection.replaceOne(Filters.eq("replaceMe", 1), Document.parse("{replaced: true}"));
        next = cursor.next();
        System.out.println(format("Replace operationType: %s", next));

        // Delete the test document.
        collection.deleteOne(Filters.eq("username", "alice123"));
        next = cursor.next();
        System.out.println(format("Delete operationType: %s", next));
        cursor.close();
        sleep();

        /**
         * Example 4
         * Resume a change stream using a resume token.
         */
        System.out.println("4. Document from the Change Stream including a resume token:");

        // Get the resume token from the last document we saw in the previous change stream cursor.
        BsonDocument resumeToken = next.getResumeToken();
        System.out.println(resumeToken);

        // Pass the resume token to the resume after function to continue the change stream cursor.
        cursor = collection.watch().resumeAfter(resumeToken).iterator();

        // Insert a test document.
        collection.insertOne(Document.parse("{test: 'd'}"));

        // Block until the next result is returned
        next = cursor.next();
        System.out.println(next);
        cursor.close();

        mongoClient.close();
    }

    private static void sleep() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore.
        }
    }
}

Compile

mvn compile

Run

mvn exec:java -Dexec.mainClass=com.admatic.ChangeStreamSamples \
    -Dexec.args="mongodb+srv://admatic:admatic123@admatic-cluster-7qyyr.mongodb.net/test"
1. Initial document from the Change Stream:
ChangeStreamDocument{resumeToken={"_data": "825C7780D80000004229295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9807, username=alice123, name=Alice}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9807"}}, clusterTime=Timestamp{value=6662935838919229506, seconds=1551335640, inc=66}, operationType=OperationType{value='insert'}, updateDescription=null}
2. Document from the Change Stream, with lookup enabled:
ChangeStreamDocument{resumeToken={"_data": "825C7780D80000004429295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9807, username=alice123, name=Alice, email=alice@example.com}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9807"}}, clusterTime=Timestamp{value=6662935838919229508, seconds=1551335640, inc=68}, operationType=OperationType{value='update'}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"email": "alice@example.com"}}}
3. Document from the Change Stream, with lookup enabled, matching `update` operations only:
Update operationType: UpdateDescription{removedFields=[], updatedFields={"updated": true}}
 ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000000429295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98080004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9808, updateMe=1, updated=true}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9808"}}, clusterTime=Timestamp{value=6662935847509164036, seconds=1551335642, inc=4}, operationType=OperationType{value='update'}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"updated": true}}}
Replace operationType: ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000000529295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98090004"}, namespace=testChan
geStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9809, replaced=true}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9809"}}, clusterTime=Timestamp{value=6662935847509164037, seconds=1551335642, inc=5}, operationType=OperationType{value='replace'}, updateDescription=null}
Delete operationType: ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000000629295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}, namespace=testChangeStreams.documents, fullDocument=null, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9807"}}, clusterTime=Timestamp{value=6662935847509164038, seconds=1551335642, inc=6}, operationType=OperationType{value='delete'}, updateDescription=null}
4. Document from the Change Stream including a resume token:
{"_data": "825C7780DA0000000629295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}
ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000003629295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780DAB2E0C35E077B980A0004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780dab2e0c35e077b980a, test=d}}, documentKey={"_id": {"$oid": "5c7780dab2e0c35e077b980a"}}, clusterTime=Timestamp{value=6662935847509164086, seconds=1551335642, inc=54}, operationType=OperationType{value='insert'}, updateDescription=null}

results matching ""

    No results matching ""