package com.microsoft.azure.flink.writer.internal.sink;

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/microsoft/azure/flink/writer/internal/sink/KustoSinkWriter.class */
public class KustoSinkWriter<IN> implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(KustoSinkWriter.class);
    private final KustoSinkCommon<IN> kustoSinkCommon;
    private final KustoWriteOptions writeOptions;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final Collector<IN> collector;
    private final Counter numRecordsOut;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;
    private final List<IN> bulkRequests = new ArrayList();
    private boolean checkpointInProgress = false;
    private volatile transient boolean closed = false;

    public KustoSinkWriter(KustoConnectionOptions kustoConnectionOptions, KustoWriteOptions kustoWriteOptions, @NotNull TypeSerializer<IN> typeSerializer, @NotNull TypeInformation<IN> typeInformation, boolean z, Sink.InitContext initContext) throws URISyntaxException {
        this.writeOptions = (KustoWriteOptions) Preconditions.checkNotNull(kustoWriteOptions);
        this.flushOnCheckpoint = z;
        Preconditions.checkNotNull(initContext);
        this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(initContext.getMailboxExecutor());
        SinkWriterMetricGroup sinkWriterMetricGroup = (SinkWriterMetricGroup) Preconditions.checkNotNull(initContext.metricGroup());
        this.numRecordsOut = sinkWriterMetricGroup.getNumRecordsSendCounter();
        this.collector = new ListCollector(this.bulkRequests);
        LOG.info("Initializing the class from KustoSinkWriter");
        this.kustoSinkCommon = new KustoSinkCommon<>((KustoConnectionOptions) Preconditions.checkNotNull(kustoConnectionOptions), this.writeOptions, sinkWriterMetricGroup, typeSerializer, typeInformation, KustoSinkWriter.class.getSimpleName());
        sinkWriterMetricGroup.setCurrentSendTimeGauge(() -> {
            return Long.valueOf(this.kustoSinkCommon.ackTime - this.kustoSinkCommon.lastSendTime);
        });
        if ((kustoWriteOptions.getDeliveryGuarantee() == DeliveryGuarantee.AT_LEAST_ONCE) || kustoWriteOptions.getBatchIntervalMs() <= 0) {
            return;
        }
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("kusto-writer"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            synchronized (this) {
                if (!this.closed && isOverMaxBatchIntervalLimit()) {
                    try {
                        doBulkWrite();
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, kustoWriteOptions.getBatchIntervalMs(), kustoWriteOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        checkFlushException();
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        this.numRecordsOut.inc();
        this.collector.collect(in);
        if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
            doBulkWrite();
        }
    }

    public void flush(boolean z) throws IOException {
        checkFlushException();
        this.checkpointInProgress = true;
        while (!this.bulkRequests.isEmpty() && (this.flushOnCheckpoint || z)) {
            doBulkWrite();
        }
        this.checkpointInProgress = false;
    }

    public synchronized void close() throws Exception {
        if (this.closed) {
            return;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
        try {
            if (this.bulkRequests.isEmpty()) {
                this.kustoSinkCommon.close();
                this.closed = true;
                return;
            }
            try {
                doBulkWrite();
                this.kustoSinkCommon.close();
                this.closed = true;
            } catch (Exception e) {
                LOG.error("Writing records to Kusto failed when closing KustoWriter", e);
                throw new IOException("Writing records to Kusto failed.", e);
            }
        } catch (Throwable th) {
            this.kustoSinkCommon.close();
            this.closed = true;
            throw th;
        }
    }

    void doBulkWrite() throws IOException {
        if (this.bulkRequests.isEmpty()) {
            LOG.debug("No records to write to DB {} & table {} ", this.writeOptions.getDatabase(), this.writeOptions.getTable());
            return;
        }
        LOG.info("Ingesting to DB {} & table {} record count {}", new Object[]{this.writeOptions.getDatabase(), this.writeOptions.getTable(), Integer.valueOf(this.bulkRequests.size())});
        if (this.kustoSinkCommon.ingest(this.bulkRequests)) {
            this.bulkRequests.clear();
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        long batchSize = this.writeOptions.getBatchSize();
        boolean z = batchSize != -1 && ((long) this.bulkRequests.size()) >= batchSize;
        if (z) {
            LOG.debug("OverMaxBatchSizeLimit triggered at time {} with batch size {}.", Instant.now(Clock.systemUTC()), Integer.valueOf(this.bulkRequests.size()));
        }
        return z;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        long batchIntervalMs = this.writeOptions.getBatchIntervalMs();
        long epochMilli = Instant.now(Clock.systemUTC()).toEpochMilli() - this.kustoSinkCommon.lastSendTime;
        boolean z = this.kustoSinkCommon.lastSendTime >= 0 && batchIntervalMs != -1 && epochMilli >= batchIntervalMs;
        if (z) {
            LOG.trace("OverMaxBatchIntervalLimit triggered at {}. LastSentTime {}.The last sent interval is {} and bulkFlushInterval {}.", new Object[]{Instant.now(Clock.systemUTC()), Instant.ofEpochMilli(this.kustoSinkCommon.lastSendTime), Long.valueOf(epochMilli), Long.valueOf(batchIntervalMs)});
        }
        return z;
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to Kusto failed.", this.flushException);
        }
    }
}
