package org.apache.pulsar.io.mongodb;

import com.google.common.collect.Lists;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.CharEncoding;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.bson.BSONException;
import org.bson.Document;
import org.bson.json.JsonParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "mongo", type = IOType.SINK, help = "A sink connector that sends pulsar messages to mongodb", configClass = MongoConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/mongodb/MongoSink.class */
public class MongoSink implements Sink<byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MongoSink.class);
    private MongoConfig mongoConfig;
    private MongoClient mongoClient;
    private MongoCollection<Document> collection;
    private List<Record<byte[]>> incomingList;
    private ScheduledExecutorService flushExecutor;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        log.info("Open MongoDB Sink");
        this.mongoConfig = MongoConfig.load(map);
        this.mongoConfig.validate();
        this.mongoClient = MongoClients.create(this.mongoConfig.getMongoUri());
        this.collection = this.mongoClient.getDatabase(this.mongoConfig.getDatabase()).getCollection(this.mongoConfig.getCollection());
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(() -> {
            flush();
        }, this.mongoConfig.getBatchTimeMs(), this.mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<byte[]> record) {
        int size;
        String str = new String(record.getValue(), Charset.forName(CharEncoding.UTF_8));
        if (log.isDebugEnabled()) {
            log.debug("Received record: " + str);
        }
        synchronized (this) {
            this.incomingList.add(record);
            size = this.incomingList.size();
        }
        if (size == this.mongoConfig.getBatchSize()) {
            this.flushExecutor.submit(() -> {
                flush();
            });
        }
    }

    private void flush() {
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            List<Record<byte[]>> list = this.incomingList;
            this.incomingList = Lists.newArrayList();
            Iterator<Record<byte[]>> it = list.iterator();
            while (it.hasNext()) {
                Record<byte[]> next = it.next();
                try {
                    arrayList.add(Document.parse(new String(next.getValue(), Charset.forName(CharEncoding.UTF_8))));
                } catch (BSONException | JsonParseException e) {
                    log.error("Bad message", e);
                    next.fail();
                    it.remove();
                }
            }
            if (arrayList.size() > 0) {
                this.collection.insertMany(arrayList, (r7, th) -> {
                    List list2 = (List) IntStream.range(0, arrayList.size()).boxed().collect(Collectors.toList());
                    ArrayList newArrayList = Lists.newArrayList();
                    if (th != null) {
                        log.error("MongoDB insertion error", th);
                        if (th instanceof MongoBulkWriteException) {
                            ((MongoBulkWriteException) th).getWriteErrors().forEach(bulkWriteError -> {
                                newArrayList.add(Integer.valueOf(bulkWriteError.getIndex()));
                            });
                            list2.removeAll(newArrayList);
                        } else {
                            newArrayList.addAll(list2);
                            list2.clear();
                        }
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Nb ack={}, nb fail={}", Integer.valueOf(list2.size()), Integer.valueOf(newArrayList.size()));
                    }
                    list2.forEach(num -> {
                        ((Record) list.get(num.intValue())).ack();
                    });
                    newArrayList.forEach(num2 -> {
                        ((Record) list.get(num2.intValue())).fail();
                    });
                });
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.flushExecutor != null) {
            this.flushExecutor.shutdown();
        }
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }
}
