/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.mongodb;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.mongodb.MongoConfig;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="mongo", type=IOType.SOURCE, help="A source connector that sends mongodb documents to pulsar", configClass=MongoConfig.class)
public class MongoSource
extends PushSource<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(MongoSource.class);
    private final Supplier<MongoClient> clientProvider;
    private MongoConfig mongoConfig;
    private MongoClient mongoClient;
    private Thread streamThread;
    private ChangeStreamPublisher<Document> stream;

    public MongoSource() {
        this(null);
    }

    public MongoSource(Supplier<MongoClient> clientProvider) {
        this.clientProvider = clientProvider;
    }

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        log.info("Open MongoDB Source");
        this.mongoConfig = MongoConfig.load(config);
        this.mongoConfig.validate(false, false);
        this.mongoClient = this.clientProvider != null ? this.clientProvider.get() : MongoClients.create((String)this.mongoConfig.getMongoUri());
        if (StringUtils.isEmpty((CharSequence)this.mongoConfig.getDatabase())) {
            log.info("Watch all");
            this.stream = this.mongoClient.watch();
        } else {
            MongoDatabase db = this.mongoClient.getDatabase(this.mongoConfig.getDatabase());
            if (StringUtils.isEmpty((CharSequence)this.mongoConfig.getCollection())) {
                log.info("Watch db: {}", (Object)db.getName());
                this.stream = db.watch();
            } else {
                MongoCollection collection = db.getCollection(this.mongoConfig.getCollection());
                log.info("Watch collection: {} {}", (Object)db.getName(), (Object)this.mongoConfig.getCollection());
                this.stream = collection.watch();
            }
        }
        this.stream.batchSize(this.mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP);
        this.stream.subscribe((Subscriber)new Subscriber<ChangeStreamDocument<Document>>(){
            private ObjectMapper mapper = new ObjectMapper();
            private Subscription subscription;

            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(Integer.MAX_VALUE);
            }

            public void onNext(ChangeStreamDocument<Document> doc) {
                try {
                    log.info("New change doc: {}", doc);
                    HashMap<String, Object> recordValue = new HashMap<String, Object>();
                    recordValue.put("fullDocument", doc.getFullDocument());
                    recordValue.put("ns", doc.getNamespace());
                    recordValue.put("operation", doc.getOperationType());
                    MongoSource.this.consume(new DocRecord(Optional.of(doc.getDocumentKey().toJson()), this.mapper.writeValueAsString(recordValue).getBytes(StandardCharsets.UTF_8)));
                }
                catch (JsonProcessingException e) {
                    log.error("Processing doc from mongo", (Throwable)e);
                }
            }

            public void onError(Throwable error) {
                log.error("Subscriber error", error);
            }

            public void onComplete() {
                log.info("Subscriber complete");
            }
        });
    }

    public void close() throws Exception {
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    private static class DocRecord
    implements Record<byte[]> {
        private final Optional<String> key;
        private final byte[] value;

        public DocRecord(Optional<String> key, byte[] value) {
            this.key = key;
            this.value = value;
        }

        public Optional<String> getKey() {
            return this.key;
        }

        public byte[] getValue() {
            return this.value;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof DocRecord)) {
                return false;
            }
            DocRecord other = (DocRecord)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Optional<String> this$key = this.getKey();
            Optional<String> other$key = other.getKey();
            if (this$key == null ? other$key != null : !((Object)this$key).equals(other$key)) {
                return false;
            }
            return Arrays.equals(this.getValue(), other.getValue());
        }

        protected boolean canEqual(Object other) {
            return other instanceof DocRecord;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Optional<String> $key = this.getKey();
            result = result * 59 + ($key == null ? 43 : ((Object)$key).hashCode());
            result = result * 59 + Arrays.hashCode(this.getValue());
            return result;
        }

        public String toString() {
            return "MongoSource.DocRecord(key=" + this.getKey() + ", value=" + Arrays.toString(this.getValue()) + ")";
        }
    }
}

