/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoException;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClient;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClients;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.WriteModel;
import org.apache.flink.mongodb.shaded.org.bson.BsonDocument;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MongoWriter<IN>
implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriter.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoWriteOptions writeOptions;
    private final MongoSerializationSchema<IN> serializationSchema;
    private final MongoSinkContext sinkContext;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final List<WriteModel<BsonDocument>> bulkRequests = new ArrayList<WriteModel<BsonDocument>>();
    private final Collector<WriteModel<BsonDocument>> collector;
    private final Counter numRecordsOut;
    private final MongoClient mongoClient;
    private final long batchIntervalMs;
    private final int batchSize;
    private boolean checkpointInProgress = false;
    private volatile long lastSendTime = 0L;
    private volatile long ackTime = Long.MAX_VALUE;
    private volatile transient boolean closed = false;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;

    public MongoWriter(MongoConnectionOptions connectionOptions, MongoWriteOptions writeOptions, boolean flushOnCheckpoint, Sink.InitContext initContext, MongoSerializationSchema<IN> serializationSchema) {
        boolean flushOnlyOnCheckpoint;
        this.connectionOptions = (MongoConnectionOptions)Preconditions.checkNotNull((Object)connectionOptions);
        this.writeOptions = (MongoWriteOptions)Preconditions.checkNotNull((Object)writeOptions);
        this.serializationSchema = (MongoSerializationSchema)Preconditions.checkNotNull(serializationSchema);
        this.flushOnCheckpoint = flushOnCheckpoint;
        this.batchIntervalMs = writeOptions.getBatchIntervalMs();
        this.batchSize = writeOptions.getBatchSize();
        Preconditions.checkNotNull((Object)initContext);
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)initContext.getMailboxExecutor());
        SinkWriterMetricGroup metricGroup = (SinkWriterMetricGroup)Preconditions.checkNotNull((Object)initContext.metricGroup());
        metricGroup.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTime);
        this.numRecordsOut = metricGroup.getNumRecordsSendCounter();
        this.collector = new ListCollector(this.bulkRequests);
        this.sinkContext = new DefaultMongoSinkContext(initContext, writeOptions);
        try {
            SerializationSchema.InitializationContext initializationContext = initContext.asSerializationSchemaInitializationContext();
            serializationSchema.open(initializationContext, this.sinkContext, writeOptions);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to open the MongoEmitter", (Throwable)e);
        }
        this.mongoClient = MongoClients.create(connectionOptions.getUri());
        boolean bl = flushOnlyOnCheckpoint = this.batchIntervalMs == -1L && this.batchSize == -1;
        if (!flushOnlyOnCheckpoint && this.batchIntervalMs > 0L) {
            this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("mongo-writer"));
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                MongoWriter mongoWriter = this;
                synchronized (mongoWriter) {
                    if (!this.closed && this.isOverMaxBatchIntervalLimit()) {
                        try {
                            this.doBulkWrite();
                        }
                        catch (Exception e) {
                            this.flushException = e;
                        }
                    }
                }
            }, this.batchIntervalMs, this.batchIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    public synchronized void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        this.checkFlushException();
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        WriteModel<BsonDocument> writeModel = this.serializationSchema.serialize(element, this.sinkContext);
        this.numRecordsOut.inc();
        this.collector.collect(writeModel);
        if (this.isOverMaxBatchSizeLimit() || this.isOverMaxBatchIntervalLimit()) {
            this.doBulkWrite();
        }
    }

    public synchronized void flush(boolean endOfInput) throws IOException {
        this.checkFlushException();
        this.checkpointInProgress = true;
        while (!this.bulkRequests.isEmpty() && (this.flushOnCheckpoint || endOfInput)) {
            this.doBulkWrite();
        }
        this.checkpointInProgress = false;
    }

    public synchronized void close() throws Exception {
        if (!this.closed) {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (!this.bulkRequests.isEmpty()) {
                try {
                    this.doBulkWrite();
                }
                catch (Exception e) {
                    LOG.error("Writing records to MongoDB failed when closing MongoWriter", (Throwable)e);
                    throw new IOException("Writing records to MongoDB failed.", e);
                }
                finally {
                    this.mongoClient.close();
                    this.closed = true;
                }
            } else {
                this.mongoClient.close();
                this.closed = true;
            }
        }
    }

    @VisibleForTesting
    void doBulkWrite() throws IOException {
        if (this.bulkRequests.isEmpty()) {
            return;
        }
        int maxRetries = this.writeOptions.getMaxRetries();
        long retryIntervalMs = this.writeOptions.getRetryIntervalMs();
        for (int i = 0; i <= maxRetries; ++i) {
            try {
                this.lastSendTime = System.currentTimeMillis();
                this.mongoClient.getDatabase(this.connectionOptions.getDatabase()).getCollection(this.connectionOptions.getCollection(), BsonDocument.class).bulkWrite(this.bulkRequests);
                this.ackTime = System.currentTimeMillis();
                this.bulkRequests.clear();
                break;
            }
            catch (MongoException e) {
                LOG.debug("Bulk Write to MongoDB failed, retry times = {}", (Object)i, (Object)e);
                if (i >= maxRetries) {
                    LOG.error("Bulk Write to MongoDB failed", (Throwable)e);
                    throw new IOException(e);
                }
                try {
                    Thread.sleep(retryIntervalMs * (long)(i + 1));
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        return this.batchSize != -1 && this.bulkRequests.size() >= this.batchSize;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        long lastSentInterval = System.currentTimeMillis() - this.lastSendTime;
        return this.batchIntervalMs != -1L && lastSentInterval >= this.batchIntervalMs;
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to MongoDB failed.", this.flushException);
        }
    }
}

