package org.apache.flink.connector.mongodb.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoException;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClient;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClients;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.WriteModel;
import org.apache.flink.mongodb.shaded.org.bson.BsonDocument;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/mongodb/sink/writer/MongoWriter.class */
public class MongoWriter<IN> implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriter.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoWriteOptions writeOptions;
    private final MongoSerializationSchema<IN> serializationSchema;
    private final MongoSinkContext sinkContext;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final Collector<WriteModel<BsonDocument>> collector;
    private final Counter numRecordsOut;
    private final MongoClient mongoClient;
    private final List<WriteModel<BsonDocument>> bulkRequests = new ArrayList();
    private boolean checkpointInProgress = false;
    private volatile long lastSendTime = 0;
    private volatile long ackTime = Long.MAX_VALUE;

    public MongoWriter(MongoConnectionOptions mongoConnectionOptions, MongoWriteOptions mongoWriteOptions, boolean z, Sink.InitContext initContext, MongoSerializationSchema<IN> mongoSerializationSchema) {
        this.connectionOptions = (MongoConnectionOptions) Preconditions.checkNotNull(mongoConnectionOptions);
        this.writeOptions = (MongoWriteOptions) Preconditions.checkNotNull(mongoWriteOptions);
        this.serializationSchema = (MongoSerializationSchema) Preconditions.checkNotNull(mongoSerializationSchema);
        this.flushOnCheckpoint = z;
        Preconditions.checkNotNull(initContext);
        this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(initContext.getMailboxExecutor());
        SinkWriterMetricGroup sinkWriterMetricGroup = (SinkWriterMetricGroup) Preconditions.checkNotNull(initContext.metricGroup());
        sinkWriterMetricGroup.setCurrentSendTimeGauge(() -> {
            return Long.valueOf(this.ackTime - this.lastSendTime);
        });
        this.numRecordsOut = sinkWriterMetricGroup.getNumRecordsSendCounter();
        this.collector = new ListCollector(this.bulkRequests);
        this.sinkContext = new DefaultMongoSinkContext(initContext, mongoWriteOptions);
        try {
            mongoSerializationSchema.open(initContext.asSerializationSchemaInitializationContext(), this.sinkContext, mongoWriteOptions);
            this.mongoClient = MongoClients.create(mongoConnectionOptions.getUri());
        } catch (Exception e) {
            throw new FlinkRuntimeException("Failed to open the MongoEmitter", e);
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        WriteModel<BsonDocument> serialize = this.serializationSchema.serialize(in, this.sinkContext);
        this.numRecordsOut.inc();
        this.collector.collect(serialize);
        if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
            doBulkWrite();
        }
    }

    public void flush(boolean z) throws IOException {
        this.checkpointInProgress = true;
        while (!this.bulkRequests.isEmpty() && (this.flushOnCheckpoint || z)) {
            doBulkWrite();
        }
        this.checkpointInProgress = false;
    }

    public void close() {
        this.mongoClient.close();
    }

    @VisibleForTesting
    void doBulkWrite() throws IOException {
        if (this.bulkRequests.isEmpty()) {
            return;
        }
        int maxRetries = this.writeOptions.getMaxRetries();
        long retryIntervalMs = this.writeOptions.getRetryIntervalMs();
        for (int i = 0; i <= maxRetries; i++) {
            try {
                this.lastSendTime = System.currentTimeMillis();
                this.mongoClient.getDatabase(this.connectionOptions.getDatabase()).getCollection(this.connectionOptions.getCollection(), BsonDocument.class).bulkWrite(this.bulkRequests);
                this.ackTime = System.currentTimeMillis();
                this.bulkRequests.clear();
                return;
            } catch (MongoException e) {
                LOG.debug("Bulk Write to MongoDB failed, retry times = {}", Integer.valueOf(i), e);
                if (i >= maxRetries) {
                    LOG.error("Bulk Write to MongoDB failed", e);
                    throw new IOException(e);
                }
                try {
                    Thread.sleep(retryIntervalMs * (i + 1));
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        int batchSize = this.writeOptions.getBatchSize();
        return batchSize != -1 && this.bulkRequests.size() >= batchSize;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        long batchIntervalMs = this.writeOptions.getBatchIntervalMs();
        return batchIntervalMs != -1 && System.currentTimeMillis() - this.lastSendTime >= batchIntervalMs;
    }
}
