/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.Map;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MongoDBContainer;

public class MongoSourceTester
extends SourceTester<MongoDBContainer> {
    private static final Logger log = LoggerFactory.getLogger(MongoSourceTester.class);
    private static final String SOURCE_TYPE = "mongo";
    private static final String DEFAULT_DATABASE = "test";
    private static final int DEFAULT_BATCH_SIZE = 2;
    private final MongoDBContainer mongoContainer;
    private final PulsarCluster pulsarCluster;

    protected MongoSourceTester(MongoDBContainer mongoContainer, PulsarCluster pulsarCluster) {
        super(SOURCE_TYPE);
        this.mongoContainer = mongoContainer;
        this.pulsarCluster = pulsarCluster;
        this.sourceConfig.put("mongoUri", mongoContainer.getConnectionString());
        this.sourceConfig.put("database", DEFAULT_DATABASE);
        this.sourceConfig.put("syncType", "full_sync");
        this.sourceConfig.put("batchSize", 2);
    }

    @Override
    public void setServiceContainer(MongoDBContainer serviceContainer) {
        log.info("start mongodb server container.");
        this.pulsarCluster.startService("debezium-mongodb-example", (GenericContainer<?>)this.mongoContainer);
    }

    @Override
    public void prepareSource() throws Exception {
        MongoClient mongoClient = MongoClients.create((String)this.mongoContainer.getConnectionString());
        MongoDatabase db = mongoClient.getDatabase(DEFAULT_DATABASE);
        log.info("Subscribing mongodb change streams on: {}", (Object)this.mongoContainer.getReplicaSetUrl(DEFAULT_DATABASE));
        ChangeStreamPublisher stream = db.watch();
        stream.batchSize(2).fullDocument(FullDocument.UPDATE_LOOKUP);
        stream.subscribe((Subscriber)new Subscriber<ChangeStreamDocument<Document>>(){

            public void onSubscribe(Subscription subscription) {
                subscription.request(Integer.MAX_VALUE);
            }

            public void onNext(ChangeStreamDocument<Document> doc) {
                log.info("New change doc: {}", doc);
            }

            public void onError(Throwable error) {
                log.error("Subscriber error", error);
            }

            public void onComplete() {
                log.info("Subscriber complete");
            }
        });
        log.info("Successfully subscribe to mongodb change streams");
    }

    @Override
    public void prepareInsertEvent() throws Exception {
        Container.ExecResult execResult = this.mongoContainer.execInContainer(new String[]{"/usr/bin/mongo", "--eval", "db.products.insert({name: \"test-mongo\",description: \"test message\"})"});
        log.info("Successfully insert a message: {}", (Object)execResult.getStdout());
    }

    @Override
    public void prepareDeleteEvent() throws Exception {
        Container.ExecResult execResult = this.mongoContainer.execInContainer(new String[]{"/usr/bin/mongo", "--eval", "db.products.deleteOne({name: \"test-mongo\"})"});
        log.info("Successfully delete a message: {}", (Object)execResult.getStdout());
    }

    @Override
    public void prepareUpdateEvent() throws Exception {
        Container.ExecResult execResult = this.mongoContainer.execInContainer(new String[]{"/usr/bin/mongo", "--eval", "db.products.update({name: \"test-mongo-source\"},{$set:{name:\"test-mongo-update\", description: \"updated message\"}})"});
        log.info("Successfully update a message: {}", (Object)execResult.getStdout());
    }

    @Override
    public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
        log.info("mongodb server already contains preconfigured data.");
        return null;
    }

    @Override
    public void close() throws Exception {
        if (this.mongoContainer != null) {
            this.mongoContainer.close();
        }
    }
}

