/*
 * Decompiled with CFR 0.152.
 */
package de.bwaldvogel.mongo.backend;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import de.bwaldvogel.mongo.backend.AbstractTest;
import de.bwaldvogel.mongo.backend.CollectionUtils;
import de.bwaldvogel.mongo.backend.TestSubscriber;
import de.bwaldvogel.mongo.backend.TestUtils;
import de.bwaldvogel.mongo.oplog.OperationType;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public abstract class AbstractOplogTest
extends AbstractTest {
    protected static final String LOCAL_DATABASE = "local";
    protected static final String OPLOG_COLLECTION_NAME = "oplog.rs";

    @BeforeEach
    void beforeEach() {
        backend.enableOplog();
    }

    @Override
    protected void dropAllDatabases() {
        super.dropAllDatabases();
        this.clearOplog();
    }

    protected void clearOplog() {
        this.getOplogCollection().deleteMany((Bson)TestUtils.json(""));
    }

    protected MongoCollection<Document> getOplogCollection() {
        MongoDatabase localDb = syncClient.getDatabase(LOCAL_DATABASE);
        return localDb.getCollection(OPLOG_COLLECTION_NAME);
    }

    @Test
    void testListDatabaseNames() throws Exception {
        AbstractOplogTest.assertThat(this.listDatabaseNames()).contains((Object[])new String[]{LOCAL_DATABASE});
        collection.insertOne((Object)TestUtils.json(""));
        AbstractOplogTest.assertThat(this.listDatabaseNames()).containsExactlyInAnyOrder((Object[])new String[]{db.getName(), LOCAL_DATABASE});
        syncClient.getDatabase("bar").getCollection("some-collection").insertOne((Object)TestUtils.json(""));
        AbstractOplogTest.assertThat(this.listDatabaseNames()).containsExactlyInAnyOrder((Object[])new String[]{"bar", db.getName(), LOCAL_DATABASE});
    }

    @Test
    void testOplogInsertUpdateAndDelete() {
        Document document = TestUtils.json("_id: 1, name: 'testUser1'");
        collection.insertOne((Object)document);
        clock.windForward(Duration.ofSeconds(1L));
        collection.updateOne((Bson)TestUtils.json("_id: 1"), (Bson)TestUtils.json("$set: {name: 'user 2'}"));
        clock.windForward(Duration.ofSeconds(1L));
        collection.deleteOne((Bson)TestUtils.json("_id: 1"));
        List oplogDocuments = TestUtils.toArray(this.getOplogCollection().find().sort((Bson)TestUtils.json("ts: 1")));
        AbstractOplogTest.assertThat(oplogDocuments).hasSize(3);
        Document insertOplogDocument = (Document)oplogDocuments.get(0);
        AbstractOplogTest.assertThat(insertOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o"});
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.INSERT.getCode());
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"wall")).isEqualTo((Object)Date.from(Instant.parse("2019-05-23T12:00:00.123Z")));
        AbstractOplogTest.assertThat(insertOplogDocument.get((Object)"o")).isEqualTo((Object)document);
        Document updateOplogDocument = (Document)oplogDocuments.get(1);
        AbstractOplogTest.assertThat(updateOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o", "o2"});
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.UPDATE.getCode());
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"wall")).isEqualTo((Object)Date.from(Instant.parse("2019-05-23T12:00:01.123Z")));
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o2")).isEqualTo((Object)TestUtils.json("_id: 1"));
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o")).isEqualTo((Object)TestUtils.json("$set: {name: 'user 2'}"));
        Document deleteOplogDocument = (Document)oplogDocuments.get(2);
        AbstractOplogTest.assertThat(deleteOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o"});
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.DELETE.getCode());
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"wall")).isEqualTo((Object)Date.from(Instant.parse("2019-05-23T12:00:02.123Z")));
        AbstractOplogTest.assertThat(deleteOplogDocument.get((Object)"o")).isEqualTo((Object)TestUtils.json("_id: 1"));
    }

    @Test
    void testQueryOplogWhenOplogIsDisabled() throws Exception {
        backend.disableOplog();
        collection.insertOne((Object)TestUtils.json("_id: 1"));
        AbstractOplogTest.assertThat(this.getOplogCollection().find()).isEmpty();
    }

    @Test
    void testSetOplogReplaceOneById() {
        collection.insertOne((Object)TestUtils.json("_id: 1, b: 6"));
        collection.replaceOne((Bson)TestUtils.json("_id: 1"), (Object)TestUtils.json("a: 5, b: 7"));
        List oplogDocuments = TestUtils.toArray(this.getOplogCollection().find().sort((Bson)TestUtils.json("ts: 1")));
        Document updateOplogEntry = (Document)oplogDocuments.get(1);
        AbstractOplogTest.assertThat(updateOplogEntry.get((Object)"op")).isEqualTo((Object)OperationType.UPDATE.getCode());
        AbstractOplogTest.assertThat(updateOplogEntry.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().toString());
        AbstractOplogTest.assertThat(updateOplogEntry.get((Object)"o")).isEqualTo((Object)TestUtils.json("_id: 1, a: 5, b: 7"));
        AbstractOplogTest.assertThat(updateOplogEntry.get((Object)"o2")).isEqualTo((Object)TestUtils.json("_id: 1"));
    }

    @Test
    void testSetOplogUpdateOneById() {
        collection.insertOne((Object)TestUtils.json("_id: 34, b: 6"));
        collection.updateOne(Filters.eq((String)"_id", (Object)34), Updates.set((String)"a", (Object)6));
        List oplogDocuments = TestUtils.toArray(this.getOplogCollection().find((Bson)TestUtils.json("op: 'u'")).sort((Bson)TestUtils.json("ts: 1")));
        Document updateOplogDocument = (Document)CollectionUtils.getSingleElement(oplogDocuments);
        AbstractOplogTest.assertThat(updateOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o", "o2"});
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.UPDATE.getCode());
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o2")).isEqualTo((Object)TestUtils.json("_id: 34"));
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o")).isEqualTo((Object)TestUtils.json("$set: {a: 6}"));
    }

    @Test
    @Disabled(value="This test represents a missing feature")
    void testSetOplogUpdateOneByIdMultipleFields() {
        collection.insertOne((Object)TestUtils.json("_id: 1, b: 6"));
        collection.updateOne(Filters.eq((String)"_id", (Object)1), Arrays.asList(Updates.set((String)"a", (Object)7), Updates.set((String)"b", (Object)7)));
        List oplogDocuments = TestUtils.toArray(this.getOplogCollection().find().sort((Bson)TestUtils.json("ts: 1")));
        Document updateOplogDocument = (Document)oplogDocuments.get(1);
        AbstractOplogTest.assertThat(updateOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o", "o2"});
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.UPDATE.getCode());
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o2")).isEqualTo((Object)TestUtils.json("_id: 1"));
        AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o")).isEqualTo((Object)TestUtils.json("$set: {a: 7, b: 7}"));
    }

    @Test
    void testSetOplogUpdateMany() {
        collection.insertMany(Arrays.asList(TestUtils.json("_id: 1, b: 6"), TestUtils.json("_id: 2, b: 6")));
        collection.updateMany(Filters.eq((String)"b", (Object)6), Updates.set((String)"a", (Object)7));
        List oplogDocuments = TestUtils.toArray(this.getOplogCollection().find((Bson)TestUtils.json("op: 'u'")).sort((Bson)TestUtils.json("ts: 1, 'o2._id': 1")));
        AbstractOplogTest.assertThat(oplogDocuments).hasSize(2);
        for (int i = 0; i < 2; ++i) {
            Document updateOplogDocument = (Document)oplogDocuments.get(i);
            AbstractOplogTest.assertThat(updateOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o", "o2"});
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.UPDATE.getCode());
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o2")).isEqualTo((Object)TestUtils.json(String.format("_id: %d", i + 1)));
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o")).isEqualTo((Object)TestUtils.json("$set: {a: 7}"));
        }
    }

    @Test
    void testSetOplogDeleteMany() {
        collection.insertMany(Arrays.asList(TestUtils.json("_id: 1, b: 6"), TestUtils.json("_id: 2, b: 6")));
        collection.deleteMany(Filters.eq((String)"b", (Object)6));
        List oplogDocuments = TestUtils.toArray(this.getOplogCollection().find((Bson)TestUtils.json("op: 'd'")).sort((Bson)TestUtils.json("ts: 1, 'o._id': 1")));
        AbstractOplogTest.assertThat(oplogDocuments).hasSize(2);
        for (int i = 0; i < 2; ++i) {
            Document updateOplogDocument = (Document)oplogDocuments.get(i);
            AbstractOplogTest.assertThat(updateOplogDocument).containsKeys((Object[])new String[]{"ts", "t", "h", "v", "op", "ns", "ui", "wall", "o"});
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ts")).isInstanceOf(BsonTimestamp.class);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"t")).isEqualTo((Object)1L);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"h")).isEqualTo((Object)0L);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"v")).isEqualTo((Object)2L);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"op")).isEqualTo((Object)OperationType.DELETE.getCode());
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ns")).isEqualTo((Object)collection.getNamespace().getFullName());
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"ui")).isInstanceOf(UUID.class);
            AbstractOplogTest.assertThat(updateOplogDocument.get((Object)"o")).isEqualTo((Object)TestUtils.json(String.format("_id: %d", i + 1)));
        }
    }

    @Test
    void testChangeStreamInsertAndUpdateFullDocumentLookup() {
        collection.insertOne((Object)TestUtils.json("b: 1"));
        int numberOfDocs = 10;
        ArrayList<Document> insert = new ArrayList<Document>();
        ArrayList<Document> update = new ArrayList<Document>();
        ArrayList<ChangeStreamDocument> changeStreamsResult = new ArrayList<ChangeStreamDocument>();
        List<Bson> pipeline = Collections.singletonList(Aggregates.match((Bson)Filters.or((Bson[])new Bson[]{Document.parse((String)"{'fullDocument.b': 1}")})));
        try (MongoChangeStreamCursor cursor = collection.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();){
            long cursorId = cursor.getServerCursor().getId();
            for (int i = 1; i < numberOfDocs + 1; ++i) {
                Document doc = TestUtils.json(String.format("a: %d, b: 1", i));
                collection.insertOne((Object)doc);
                collection.updateOne(Filters.eq((String)"a", (Object)i), Updates.set((String)"c", (Object)(i * 10)));
                AbstractOplogTest.assertThat(cursor.hasNext()).isTrue();
                ChangeStreamDocument insertDocument = (ChangeStreamDocument)cursor.next();
                AbstractOplogTest.assertThat(cursor.getServerCursor().getId()).isEqualTo(cursorId);
                AbstractOplogTest.assertThat(cursor.hasNext()).isTrue();
                ChangeStreamDocument updateDocument = (ChangeStreamDocument)cursor.next();
                AbstractOplogTest.assertThat(cursor.getServerCursor().getId()).isEqualTo(cursorId);
                AbstractOplogTest.assertThat(((Document)insertDocument.getFullDocument()).get((Object)"a")).isEqualTo((Object)i);
                insert.add((Document)insertDocument.getFullDocument());
                AbstractOplogTest.assertThat(((Document)updateDocument.getFullDocument()).get((Object)"a")).isEqualTo((Object)i);
                update.add((Document)updateDocument.getFullDocument());
                changeStreamsResult.addAll(Arrays.asList(insertDocument, updateDocument));
            }
        }
        AbstractOplogTest.assertThat(insert.size()).isEqualTo(numberOfDocs);
        AbstractOplogTest.assertThat(update.size()).isEqualTo(numberOfDocs);
        AbstractOplogTest.assertThat(changeStreamsResult.size()).isEqualTo(numberOfDocs * 2);
    }

    @Test
    void testChangeStreamUpdateDefault() {
        collection.insertOne((Object)TestUtils.json("a: 1, b: 2, c: 3"));
        try (MongoChangeStreamCursor cursor = collection.watch().cursor();){
            collection.updateOne(Filters.eq((String)"a", (Object)1), (Bson)TestUtils.json("$set: {b: 0, c: 10}"));
            ChangeStreamDocument updateDocument = (ChangeStreamDocument)cursor.next();
            Document fullDoc = (Document)updateDocument.getFullDocument();
            AbstractOplogTest.assertThat(fullDoc).isNotNull();
            AbstractOplogTest.assertThat(fullDoc.get((Object)"b")).isEqualTo((Object)0);
            AbstractOplogTest.assertThat(fullDoc.get((Object)"c")).isEqualTo((Object)10);
            collection.updateOne(Filters.eq((String)"a", (Object)1), Updates.unset((String)"b"));
            updateDocument = (ChangeStreamDocument)cursor.next();
            fullDoc = (Document)updateDocument.getFullDocument();
            AbstractOplogTest.assertThat(fullDoc).isNotNull();
            AbstractOplogTest.assertThat(fullDoc.get((Object)"b")).isEqualTo((Object)"");
        }
    }

    @Test
    void testChangeStreamDelete() {
        collection.insertOne((Object)TestUtils.json("_id: 1"));
        try (MongoChangeStreamCursor cursor = collection.watch().cursor();){
            collection.deleteOne((Bson)TestUtils.json("_id: 1"));
            ChangeStreamDocument deleteDocument = (ChangeStreamDocument)cursor.next();
            AbstractOplogTest.assertThat(deleteDocument.getDocumentKey().get((Object)"_id")).isEqualTo((Object)new BsonInt32(1));
        }
    }

    @Test
    void testChangeStreamStartAfter() {
        collection.insertOne((Object)TestUtils.json("a: 1"));
        try (MongoChangeStreamCursor cursor = collection.watch().cursor();){
            collection.insertOne((Object)TestUtils.json("a: 2"));
            collection.insertOne((Object)TestUtils.json("a: 3"));
            ChangeStreamDocument document = (ChangeStreamDocument)cursor.next();
            BsonDocument resumeToken = document.getResumeToken();
            try (MongoChangeStreamCursor cursor2 = collection.watch().startAfter(resumeToken).cursor();){
                ChangeStreamDocument document2 = (ChangeStreamDocument)cursor2.next();
                AbstractOplogTest.assertThat(((Document)document2.getFullDocument()).get((Object)"a")).isEqualTo((Object)3);
            }
        }
    }

    @Test
    void testChangeStreamResumeAfter() throws Exception {
        collection.insertOne((Object)TestUtils.json("a: 1"));
        try (MongoChangeStreamCursor cursor = collection.watch().cursor();){
            this.awaitNumberOfOpenCursors(1L);
            collection.insertOne((Object)TestUtils.json("a: 2"));
            collection.insertOne((Object)TestUtils.json("a: 3"));
            ChangeStreamDocument document = (ChangeStreamDocument)cursor.next();
            BsonDocument resumeToken = document.getResumeToken();
            try (MongoChangeStreamCursor cursor2 = collection.watch().resumeAfter(resumeToken).cursor();){
                this.awaitNumberOfOpenCursors(2L);
                ChangeStreamDocument document2 = (ChangeStreamDocument)cursor2.next();
                AbstractOplogTest.assertThat(((Document)document2.getFullDocument()).get((Object)"a")).isEqualTo((Object)3);
            }
        }
    }

    @Test
    void testChangeStreamResumeAfterTerminalEvent() {
        MongoCollection col = db.getCollection("test-collection");
        ChangeStreamIterable watch = col.watch().fullDocument(FullDocument.UPDATE_LOOKUP).batchSize(1);
        try (MongoChangeStreamCursor cursor = watch.cursor();){
            col.insertOne((Object)TestUtils.json("a: 1"));
            cursor.next();
            col.drop();
            ChangeStreamDocument document = (ChangeStreamDocument)cursor.next();
            BsonDocument resumeToken = document.getResumeToken();
            try (MongoChangeStreamCursor resumeAfterCursor = watch.resumeAfter(resumeToken).cursor();){
                document = (ChangeStreamDocument)resumeAfterCursor.next();
                AbstractOplogTest.assertThat(document).isNotNull();
                AbstractOplogTest.assertThat(document.getOperationType()).isEqualTo((Object)com.mongodb.client.model.changestream.OperationType.INVALIDATE);
                Assertions.assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> ((MongoChangeStreamCursor)resumeAfterCursor).next());
            }
        }
    }

    @Test
    void testChangeStreamStartAtOperationTime() {
        collection.insertOne((Object)TestUtils.json("a: 1"));
        try (MongoChangeStreamCursor cursor = collection.watch().cursor();){
            collection.insertOne((Object)TestUtils.json("a: 2"));
            collection.insertOne((Object)TestUtils.json("a: 3"));
            ChangeStreamDocument document = (ChangeStreamDocument)cursor.next();
            BsonTimestamp startAtOperationTime = document.getClusterTime();
            try (MongoChangeStreamCursor cursor2 = collection.watch().startAtOperationTime(startAtOperationTime).cursor();){
                ChangeStreamDocument document2 = (ChangeStreamDocument)cursor2.next();
                AbstractOplogTest.assertThat(((Document)document2.getFullDocument()).get((Object)"a")).isEqualTo((Object)2);
                document2 = (ChangeStreamDocument)cursor2.next();
                AbstractOplogTest.assertThat(((Document)document2.getFullDocument()).get((Object)"a")).isEqualTo((Object)3);
            }
        }
    }

    @Test
    void testChangeStreamAndReplaceOneWithUpsertTrue() throws Exception {
        TestSubscriber streamSubscriber = new TestSubscriber();
        asyncCollection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(streamSubscriber);
        this.awaitNumberOfOpenCursors(1L);
        TestSubscriber replaceOneSubscriber = new TestSubscriber();
        asyncCollection.replaceOne((Bson)TestUtils.json("a: 1"), (Object)TestUtils.json("a: 1"), new ReplaceOptions().upsert(true)).subscribe(replaceOneSubscriber);
        replaceOneSubscriber.awaitSingleValue();
        TestSubscriber findSubscriber = new TestSubscriber();
        asyncCollection.find((Bson)TestUtils.json("a:1")).subscribe(findSubscriber);
        AbstractOplogTest.assertThat(((Document)findSubscriber.awaitSingleValue()).get((Object)"a")).isEqualTo((Object)1);
        ChangeStreamDocument value = (ChangeStreamDocument)streamSubscriber.awaitSingleValue();
        AbstractOplogTest.assertThat(value.getOperationType().getValue()).isEqualTo("insert");
        AbstractOplogTest.assertThat((Document)value.getFullDocument()).isEqualTo(findSubscriber.awaitSingleValue());
    }

    @Test
    void testSimpleChangeStreamWithFilter() throws Exception {
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection, TestUtils.json("_id: 1"));
        Bson filter = Aggregates.match((Bson)Filters.eq((String)"fullDocument.bu", (Object)"abc"));
        List<Bson> pipeline = Collections.singletonList(filter);
        super.assertNoOpenCursors();
        TestSubscriber streamSubscriber = new TestSubscriber();
        asyncCollection.watch(pipeline).subscribe(streamSubscriber);
        this.awaitNumberOfOpenCursors(1L);
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection, TestUtils.json("_id: 2, bu: 'abc'"));
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection, TestUtils.json("_id: 3, bu: 'xyz'"));
        ChangeStreamDocument changeStreamDocument = (ChangeStreamDocument)streamSubscriber.awaitSingleValue();
        AbstractOplogTest.assertThat(((Document)changeStreamDocument.getFullDocument()).get((Object)"bu")).isEqualTo((Object)"abc");
    }

    @Test
    void testOplogSubscription() throws Exception {
        super.assertNoOpenCursors();
        TestSubscriber streamSubscriber = new TestSubscriber();
        asyncCollection.watch().subscribe(streamSubscriber);
        this.awaitNumberOfOpenCursors(1L);
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection, TestUtils.json("_id: 1"));
        ChangeStreamDocument changeStreamDocument = (ChangeStreamDocument)streamSubscriber.awaitSingleValue();
        AbstractOplogTest.assertThat(changeStreamDocument.getOperationType()).isEqualTo((Object)com.mongodb.client.model.changestream.OperationType.INSERT);
        AbstractOplogTest.assertThat((Document)changeStreamDocument.getFullDocument()).isEqualTo((Object)TestUtils.json("_id: 1"));
    }

    @Test
    void testOplogShouldFilterNamespaceOnChangeStreams() throws Exception {
        com.mongodb.reactivestreams.client.MongoCollection asyncCollection1 = asyncDb.getCollection(asyncCollection.getNamespace().getCollectionName() + "1");
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection, TestUtils.json("_id: 1"));
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection1, TestUtils.json("_id: 1"));
        super.assertNoOpenCursors();
        TestSubscriber streamSubscriber = new TestSubscriber();
        asyncCollection.watch().subscribe(streamSubscriber);
        this.awaitNumberOfOpenCursors(1L);
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection1, TestUtils.json("_id: 2"));
        AbstractOplogTest.insertOne((com.mongodb.reactivestreams.client.MongoCollection<Document>)asyncCollection, TestUtils.json("_id: 2"));
        streamSubscriber.awaitSingleValue();
    }

    private static void insertOne(com.mongodb.reactivestreams.client.MongoCollection<Document> collection, Document document) throws Exception {
        TestSubscriber insertSubscriber = new TestSubscriber();
        collection.insertOne((Object)document).subscribe(insertSubscriber);
        insertSubscriber.awaitSingleValue();
    }
}

