/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.mongodb;

import com.google.common.collect.Lists;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.mongodb.MongoConfig;
import org.bson.BSONException;
import org.bson.Document;
import org.bson.json.JsonParseException;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="mongo", type=IOType.SINK, help="A sink connector that sends pulsar messages to mongodb", configClass=MongoConfig.class)
public class MongoSink
implements Sink<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(MongoSink.class);
    private MongoConfig mongoConfig;
    private MongoClient mongoClient;
    private MongoCollection<Document> collection;
    private List<Record<byte[]>> incomingList;
    private ScheduledExecutorService flushExecutor;
    private Supplier<MongoClient> clientProvider;

    public MongoSink() {
        this(null);
    }

    public MongoSink(Supplier<MongoClient> clientProvider) {
        this.clientProvider = clientProvider;
    }

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        log.info("Open MongoDB Sink");
        this.mongoConfig = MongoConfig.load(config);
        this.mongoConfig.validate(true, true);
        this.mongoClient = this.clientProvider != null ? this.clientProvider.get() : MongoClients.create((String)this.mongoConfig.getMongoUri());
        MongoDatabase db = this.mongoClient.getDatabase(this.mongoConfig.getDatabase());
        this.collection = db.getCollection(this.mongoConfig.getCollection());
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(() -> this.flush(), this.mongoConfig.getBatchTimeMs(), this.mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Record<byte[]> record) {
        int currentSize;
        String recordValue = new String((byte[])record.getValue(), StandardCharsets.UTF_8);
        if (log.isDebugEnabled()) {
            log.debug("Received record: " + recordValue);
        }
        MongoSink mongoSink = this;
        synchronized (mongoSink) {
            this.incomingList.add(record);
            currentSize = this.incomingList.size();
        }
        if (currentSize == this.mongoConfig.getBatchSize()) {
            this.flushExecutor.execute(() -> this.flush());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        List<Record<byte[]>> recordsToInsert;
        ArrayList<Document> docsToInsert = new ArrayList<Document>();
        MongoSink mongoSink = this;
        synchronized (mongoSink) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            recordsToInsert = this.incomingList;
            this.incomingList = Lists.newArrayList();
        }
        Iterator<Record<byte[]>> iter = recordsToInsert.iterator();
        while (iter.hasNext()) {
            Record<byte[]> record = iter.next();
            try {
                byte[] docAsBytes = (byte[])record.getValue();
                Document doc = Document.parse((String)new String(docAsBytes, StandardCharsets.UTF_8));
                docsToInsert.add(doc);
            }
            catch (BSONException | JsonParseException e) {
                log.error("Bad message", e);
                record.fail();
                iter.remove();
            }
        }
        if (docsToInsert.size() > 0) {
            this.collection.insertMany(docsToInsert).subscribe((Subscriber)new DocsToInsertSubscriber(docsToInsert, recordsToInsert));
        }
    }

    public void close() throws Exception {
        if (this.flushExecutor != null) {
            this.flushExecutor.shutdown();
        }
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    private class DocsToInsertSubscriber
    implements Subscriber<InsertManyResult> {
        final List<Document> docsToInsert;
        final List<Record<byte[]>> recordsToInsert;
        final List<Integer> idxToAck;
        final List<Integer> idxToFail = Lists.newArrayList();

        public DocsToInsertSubscriber(List<Document> docsToInsert, List<Record<byte[]>> recordsToInsert) {
            this.docsToInsert = docsToInsert;
            this.recordsToInsert = recordsToInsert;
            this.idxToAck = IntStream.range(0, this.docsToInsert.size()).boxed().collect(Collectors.toList());
        }

        public void onSubscribe(Subscription subscription) {
            subscription.request(Integer.MAX_VALUE);
        }

        public void onNext(InsertManyResult success) {
        }

        public void onError(Throwable t) {
            if (t != null) {
                log.error("MongoDB insertion error", t);
                if (t instanceof MongoBulkWriteException) {
                    ((MongoBulkWriteException)t).getWriteErrors().forEach(err -> this.idxToFail.add(err.getIndex()));
                    this.idxToAck.removeAll(this.idxToFail);
                } else {
                    this.idxToFail.addAll(this.idxToAck);
                    this.idxToAck.clear();
                }
            }
            this.onComplete();
        }

        public void onComplete() {
            if (log.isDebugEnabled()) {
                log.debug("Nb ack={}, nb fail={}", (Object)this.idxToAck.size(), (Object)this.idxToFail.size());
            }
            this.idxToAck.forEach(idx -> this.recordsToInsert.get((int)idx).ack());
            this.idxToFail.forEach(idx -> this.recordsToInsert.get((int)idx).fail());
        }
    }
}

