package com.microsoft.azure.flink.writer.internal.committer;

import com.microsoft.azure.flink.common.KustoClientUtil;
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.ByteArrayInputStream;
import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.slf4j.Logger;

@PublicEvolving
@Internal
/* loaded from: input_file:com/microsoft/azure/flink/writer/internal/committer/KustoCommitter.class */
public class KustoCommitter extends CheckpointCommitter {
    private static final long serialVersionUID = 1;
    private final KustoConnectionOptions connectionOptions;
    private final KustoWriteOptions kustoWriteOptions;
    private IngestClient streamingIngestClient;
    private transient Client queryClient;
    private final String table = "flink_checkpoints";
    private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap();

    public KustoCommitter(KustoConnectionOptions kustoConnectionOptions, KustoWriteOptions kustoWriteOptions) {
        this.connectionOptions = kustoConnectionOptions;
        this.kustoWriteOptions = kustoWriteOptions;
    }

    public void setJobId(String str) throws Exception {
        super.setJobId(str);
    }

    public void createResource() throws Exception {
        long epochMilli = Instant.now(Clock.systemUTC()).toEpochMilli();
        Logger logger = LOG;
        getClass();
        logger.debug("Creating resources for KustoCommitter. Creating table {} in database {} and applying policies", "flink_checkpoints", this.kustoWriteOptions.getDatabase());
        getClass();
        String format = String.format(".create-merge table %s (job_id:string, sink_id:string, sub_id:int, checkpoint_id:long) with (hidden=true,folder='Flink',docstring='Checkpointing table in Flink')", "flink_checkpoints");
        getClass();
        String format2 = String.format(".alter-merge table %s policy streamingingestion '{\"IsEnabled\": true}'", "flink_checkpoints");
        getClass();
        String format3 = String.format(".alter-merge table %s policy retention softdelete = 1d recoverability = disabled", "flink_checkpoints");
        try {
            if (this.queryClient == null) {
                this.queryClient = KustoClientUtil.createClient(this.connectionOptions, KustoCommitter.class.getSimpleName());
                LOG.info("Initialized queryClient in createResource and query client is null");
            }
            this.queryClient.execute(this.kustoWriteOptions.getDatabase(), format);
            this.queryClient.execute(this.kustoWriteOptions.getDatabase(), format2);
            this.queryClient.execute(this.kustoWriteOptions.getDatabase(), format3);
            Logger logger2 = LOG;
            getClass();
            logger2.info("Created resources for KustoCommitter. Table {} in database {} took {} ms", new Object[]{"flink_checkpoints", this.kustoWriteOptions.getDatabase(), Long.valueOf(Instant.now().toEpochMilli() - epochMilli)});
        } catch (Exception e) {
            LOG.error("Error while creating resources. To use the KustoCommitter you need to have admin privileges on the database {}", this.kustoWriteOptions.getDatabase(), e);
            throw e;
        }
    }

    public void open() throws Exception {
        LOG.debug("Opening KustoCommitter");
        if (this.connectionOptions == null || this.kustoWriteOptions == null) {
            throw new IllegalArgumentException("No Connection options were provided or WriteOptions were null");
        }
        if (StringUtils.isEmpty(this.kustoWriteOptions.getDatabase())) {
            throw new IllegalArgumentException("Database provided was empty for KustoCommitter");
        }
        String simpleName = KustoCommitter.class.getSimpleName();
        this.streamingIngestClient = KustoClientUtil.createMangedIngestClient(this.connectionOptions, simpleName);
        if (this.queryClient == null) {
            this.queryClient = KustoClientUtil.createClient(this.connectionOptions, simpleName);
            LOG.info("Initialized queryClient in open and query client is null");
        }
        LOG.debug("Opened KustoCommitter");
    }

    public void close() throws Exception {
        LOG.debug("Closing KustoCommitter");
        this.lastCommittedCheckpoints.clear();
        try {
            this.streamingIngestClient.close();
            this.queryClient.close();
            LOG.debug("Closed KustoCommitter");
        } catch (Exception e) {
            LOG.warn("Error while closing resources.", e);
            throw e;
        }
    }

    public void commitCheckpoint(int i, long j) {
        long epochMilli = Instant.now().toEpochMilli();
        LOG.info("Starting checkpoint {} for subtask {} at {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), Long.valueOf(Instant.now().toEpochMilli())});
        String format = String.format("%s,%s,%d,%d", StringEscapeUtils.escapeCsv(this.jobId), StringEscapeUtils.escapeCsv(this.operatorId), Integer.valueOf(i), Long.valueOf(j));
        String database = this.kustoWriteOptions.getDatabase();
        getClass();
        IngestionProperties ingestionProperties = new IngestionProperties(database, "flink_checkpoints");
        ingestionProperties.setFlushImmediately(true);
        ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV);
        try {
            this.streamingIngestClient.ingestFromStream(new StreamSourceInfo(new ByteArrayInputStream(format.getBytes())), ingestionProperties);
            LOG.info("Committed checkpoint {} for subtask {} in {} ms", new Object[]{Long.valueOf(j), Integer.valueOf(i), Long.valueOf(Instant.now().toEpochMilli() - epochMilli)});
            this.lastCommittedCheckpoints.put(Integer.valueOf(i), Long.valueOf(j));
        } catch (IngestionClientException | IngestionServiceException e) {
            Logger logger = LOG;
            getClass();
            logger.warn("Error performing checkpoint ingestion on table {} in database {}.", new Object[]{"flink_checkpoints", this.kustoWriteOptions.getDatabase(), e});
            throw new RuntimeException((Throwable) e);
        }
    }

    public boolean isCheckpointCommitted(int i, long j) {
        long epochMilli = Instant.now().toEpochMilli();
        Long l = this.lastCommittedCheckpoints.get(Integer.valueOf(i));
        if (l == null) {
            getClass();
            try {
                KustoOperationResult execute = this.queryClient.execute(this.kustoWriteOptions.getDatabase(), String.format("%s | where job_id == '%s' and sink_id == '%s' and sub_id == %d and checkpoint_id >= %d | top 1 by checkpoint_id desc", "flink_checkpoints", this.jobId, this.operatorId, Integer.valueOf(i), Long.valueOf(j)));
                if (execute != null && execute.getPrimaryResults() != null && !execute.getPrimaryResults().getData().isEmpty()) {
                    this.lastCommittedCheckpoints.put(Integer.valueOf(i), Long.valueOf(Long.parseLong(((List) execute.getPrimaryResults().getData().get(0)).toString())));
                    return true;
                }
            } catch (DataServiceException | DataClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
        LOG.info("Checkpoint query took {} ms", Long.valueOf(Instant.now().toEpochMilli() - epochMilli));
        return l != null && j <= l.longValue();
    }
}
