Change Streams
MongoDB 3.6 introduces a new $changeStream aggregation pipeline operator.
Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection API includes a new watch method. The ChangeStreamIterable sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.
Code
vim src/main/java/com/admatic/ChangeStreamSamples.java
package com.admatic;
import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
public final class ChangeStreamSamples {
private ChangeStreamSamples() {
}
public static void main(final String[] args) {
Logger mongoLogger = Logger.getLogger("org.mongodb.driver");
mongoLogger.setLevel(Level.SEVERE);
MongoClient mongoClient;
if (args.length == 0) {
// connect to the local database server
mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019");
} else {
mongoClient = MongoClients.create(args[0]);
}
// Select the MongoDB database.
MongoDatabase database = mongoClient.getDatabase("testChangeStreams");
database.drop();
sleep();
// Select the collection to query.
MongoCollection<Document> collection = database.getCollection("documents");
/**
* Example 1
* Create a simple change stream against an existing collection.
*/
System.out.println("1. Initial document from the Change Stream:");
// Create the change stream cursor.
MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
// Insert a test document into the collection.
collection.insertOne(Document.parse("{username: 'alice123', name: 'Alice'}"));
ChangeStreamDocument<Document> next = cursor.next();
System.out.println(next);
cursor.close();
sleep();
/**
* Example 2
* Create a change stream with 'lookup' option enabled.
* The test document will be returned with a full version of the updated document.
*/
System.out.println("2. Document from the Change Stream, with lookup enabled:");
// Create the change stream cursor.
cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
// Update the test document.
collection.updateOne(Document.parse("{username: 'alice123'}"), Document.parse("{$set : { email: 'alice@example.com'}}"));
// Block until the next result is returned
next = cursor.next();
System.out.println(next);
cursor.close();
sleep();
/**
* Example 3
* Create a change stream with 'lookup' option using a $match and ($redact or $project) stage.
*/
System.out.println("3. Document from the Change Stream, with lookup enabled, matching `update` operations only: ");
// Insert some dummy data.
collection.insertMany(asList(Document.parse("{updateMe: 1}"), Document.parse("{replaceMe: 1}")));
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(
Aggregates.match(
Filters.or(
Document.parse("{'fullDocument.username': 'alice123'}"),
Filters.in("operationType", asList("update", "replace", "delete"))
)
)
);
// Create the change stream cursor with $match.
cursor = collection.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
// Forward to the end of the change stream
next = cursor.tryNext();
// Update the test document.
collection.updateOne(Filters.eq("updateMe", 1), Updates.set("updated", true));
next = cursor.next();
System.out.println(format("Update operationType: %s %n %s", next.getUpdateDescription(), next));
// Replace the test document.
collection.replaceOne(Filters.eq("replaceMe", 1), Document.parse("{replaced: true}"));
next = cursor.next();
System.out.println(format("Replace operationType: %s", next));
// Delete the test document.
collection.deleteOne(Filters.eq("username", "alice123"));
next = cursor.next();
System.out.println(format("Delete operationType: %s", next));
cursor.close();
sleep();
/**
* Example 4
* Resume a change stream using a resume token.
*/
System.out.println("4. Document from the Change Stream including a resume token:");
// Get the resume token from the last document we saw in the previous change stream cursor.
BsonDocument resumeToken = next.getResumeToken();
System.out.println(resumeToken);
// Pass the resume token to the resume after function to continue the change stream cursor.
cursor = collection.watch().resumeAfter(resumeToken).iterator();
// Insert a test document.
collection.insertOne(Document.parse("{test: 'd'}"));
// Block until the next result is returned
next = cursor.next();
System.out.println(next);
cursor.close();
mongoClient.close();
}
private static void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore.
}
}
}
Compile
mvn compile
Run
mvn exec:java -Dexec.mainClass=com.admatic.ChangeStreamSamples \
-Dexec.args="mongodb+srv://admatic:admatic123@admatic-cluster-7qyyr.mongodb.net/test"
1. Initial document from the Change Stream:
ChangeStreamDocument{resumeToken={"_data": "825C7780D80000004229295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9807, username=alice123, name=Alice}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9807"}}, clusterTime=Timestamp{value=6662935838919229506, seconds=1551335640, inc=66}, operationType=OperationType{value='insert'}, updateDescription=null}
2. Document from the Change Stream, with lookup enabled:
ChangeStreamDocument{resumeToken={"_data": "825C7780D80000004429295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9807, username=alice123, name=Alice, email=alice@example.com}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9807"}}, clusterTime=Timestamp{value=6662935838919229508, seconds=1551335640, inc=68}, operationType=OperationType{value='update'}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"email": "alice@example.com"}}}
3. Document from the Change Stream, with lookup enabled, matching `update` operations only:
Update operationType: UpdateDescription{removedFields=[], updatedFields={"updated": true}}
ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000000429295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98080004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9808, updateMe=1, updated=true}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9808"}}, clusterTime=Timestamp{value=6662935847509164036, seconds=1551335642, inc=4}, operationType=OperationType{value='update'}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"updated": true}}}
Replace operationType: ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000000529295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98090004"}, namespace=testChan
geStreams.documents, fullDocument=Document{{_id=5c7780d8b2e0c35e077b9809, replaced=true}}, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9809"}}, clusterTime=Timestamp{value=6662935847509164037, seconds=1551335642, inc=5}, operationType=OperationType{value='replace'}, updateDescription=null}
Delete operationType: ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000000629295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}, namespace=testChangeStreams.documents, fullDocument=null, documentKey={"_id": {"$oid": "5c7780d8b2e0c35e077b9807"}}, clusterTime=Timestamp{value=6662935847509164038, seconds=1551335642, inc=6}, operationType=OperationType{value='delete'}, updateDescription=null}
4. Document from the Change Stream including a resume token:
{"_data": "825C7780DA0000000629295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780D8B2E0C35E077B98070004"}
ChangeStreamDocument{resumeToken={"_data": "825C7780DA0000003629295A10040AE79A230E294C32B4EC219A72E09D5946645F696400645C7780DAB2E0C35E077B980A0004"}, namespace=testChangeStreams.documents, fullDocument=Document{{_id=5c7780dab2e0c35e077b980a, test=d}}, documentKey={"_id": {"$oid": "5c7780dab2e0c35e077b980a"}}, clusterTime=Timestamp{value=6662935847509164086, seconds=1551335642, inc=54}, operationType=OperationType{value='insert'}, updateDescription=null}