package com.microsoft.kusto.spark.datasink;

import com.microsoft.azure.kusto.data.ClientRequestProperties;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.kusto.spark.authentication.KustoAuthentication;
import com.microsoft.kusto.spark.common.KustoCoordinates;
import com.microsoft.kusto.spark.utils.ContainerAndSas;
import com.microsoft.kusto.spark.utils.CslCommandsGenerator$;
import com.microsoft.kusto.spark.utils.KustoClient;
import com.microsoft.kusto.spark.utils.KustoClientCache$;
import com.microsoft.kusto.spark.utils.KustoConstants$;
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils$;
import com.microsoft.kusto.spark.utils.KustoQueryUtils$;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import org.apache.spark.FutureAction;
import org.apache.spark.TaskContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.RDD$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.util.CollectionAccumulator;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KustoWriter.scala */
/* loaded from: input_file:com/microsoft/kusto/spark/datasink/KustoWriter$.class */
public final class KustoWriter$ {
    public static KustoWriter$ MODULE$;
    private final String com$microsoft$kusto$spark$datasink$KustoWriter$$myName;
    private final String LegacyTempIngestionTablePrefix;
    private final String TempIngestionTablePrefix;
    private final int DelayPeriodBetweenCalls;
    private final int GzipBufferSize;

    static {
        new KustoWriter$();
    }

    public String com$microsoft$kusto$spark$datasink$KustoWriter$$myName() {
        return this.com$microsoft$kusto$spark$datasink$KustoWriter$$myName;
    }

    public String LegacyTempIngestionTablePrefix() {
        return this.LegacyTempIngestionTablePrefix;
    }

    public String TempIngestionTablePrefix() {
        return this.TempIngestionTablePrefix;
    }

    public int DelayPeriodBetweenCalls() {
        return this.DelayPeriodBetweenCalls;
    }

    public int GzipBufferSize() {
        return this.GzipBufferSize;
    }

    public void write(Option<Object> option, Dataset<Row> dataset, KustoCoordinates kustoCoordinates, KustoAuthentication kustoAuthentication, WriteOptions writeOptions) {
        String str = (String) option.map(obj -> {
            return $anonfun$write$1(BoxesRunTime.unboxToLong(obj));
        }).getOrElse(() -> {
            return "";
        });
        KustoClient client = KustoClientCache$.MODULE$.getClient(kustoCoordinates.clusterAlias(), kustoCoordinates.clusterUrl(), kustoAuthentication);
        if (kustoCoordinates.table().isEmpty()) {
            KustoDataSourceUtils$.MODULE$.reportExceptionAndThrow(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new InvalidParameterException("Table name not specified"), "writing data", kustoCoordinates.clusterUrl(), kustoCoordinates.database(), KustoDataSourceUtils$.MODULE$.reportExceptionAndThrow$default$6(), KustoDataSourceUtils$.MODULE$.reportExceptionAndThrow$default$7());
        }
        ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
        clientRequestProperties.setClientRequestId(writeOptions.requestId());
        String str2 = (String) kustoCoordinates.table().get();
        String simplifyName = KustoQueryUtils$.MODULE$.simplifyName(new StringBuilder(2).append(TempIngestionTablePrefix()).append(dataset.sparkSession().sparkContext().appName()).append("_").append(str2).append(option.map(obj2 -> {
            return $anonfun$write$3(BoxesRunTime.unboxToLong(obj2));
        }).getOrElse(() -> {
            return "";
        })).append("_").append(writeOptions.requestId()).toString());
        KustoWriteResource kustoWriteResource = new KustoWriteResource(kustoAuthentication, kustoCoordinates, dataset.schema(), writeOptions, simplifyName);
        IngestionProperties ingestionProperties = getIngestionProperties(writeOptions, kustoWriteResource);
        ArrayList<String> ingestIfNotExists = ingestionProperties.getIngestIfNotExists();
        KustoResultSetTable primaryResults = client.engineClient().execute(kustoCoordinates.database(), CslCommandsGenerator$.MODULE$.generateTableGetSchemaAsRowsCommand((String) kustoCoordinates.table().get()), clientRequestProperties).getPrimaryResults();
        ingestionProperties.setIngestIfNotExists(new ArrayList());
        if (!client.shouldIngestData(kustoCoordinates, writeOptions.IngestionProperties(), primaryResults.count() > 0, clientRequestProperties)) {
            KustoDataSourceUtils$.MODULE$.logInfo(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(82).append("Ingestion skipped: Provided ingest-by tags are present in the destination table '").append(str2).append("'").toString());
            return;
        }
        client.initializeTablesBySchema(kustoCoordinates, simplifyName, dataset.schema(), primaryResults, writeOptions, clientRequestProperties);
        KustoDataSourceUtils$.MODULE$.logInfo(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(85).append("Successfully created temporary table ").append(simplifyName).append(", will be deleted after completing the operation").toString());
        client.setMappingOnStagingTableIfNeeded(ingestionProperties, str2, clientRequestProperties);
        if (ingestionProperties.getFlushImmediately()) {
            KustoDataSourceUtils$.MODULE$.logWarn(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), "Its not recommended to set flushImmediately to true");
        }
        RDD rdd = dataset.queryExecution().toRdd();
        CollectionAccumulator<PartitionResult> collectionAccumulator = rdd.sparkContext().collectionAccumulator();
        if (writeOptions.isAsync()) {
            FutureAction foreachPartitionAsync = RDD$.MODULE$.rddToAsyncRDDActions(rdd, ClassTag$.MODULE$.apply(InternalRow.class)).foreachPartitionAsync(iterator -> {
                $anonfun$write$5(str, collectionAccumulator, kustoWriteResource, iterator);
                return BoxedUnit.UNIT;
            });
            KustoDataSourceUtils$.MODULE$.logInfo(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(48).append("asynchronous write to Kusto table '").append(str2).append("' in progress").toString());
            foreachPartitionAsync.onSuccess(new KustoWriter$$anonfun$write$6(client, kustoCoordinates, str, simplifyName, collectionAccumulator, writeOptions, ingestIfNotExists, clientRequestProperties), ExecutionContext$Implicits$.MODULE$.global());
            foreachPartitionAsync.onFailure(new KustoWriter$$anonfun$write$7(client, kustoCoordinates, simplifyName, clientRequestProperties, str2), ExecutionContext$Implicits$.MODULE$.global());
            return;
        }
        try {
            rdd.foreachPartition(iterator2 -> {
                $anonfun$write$8(str, collectionAccumulator, kustoWriteResource, iterator2);
                return BoxedUnit.UNIT;
            });
            client.finalizeIngestionWhenWorkersSucceeded(kustoCoordinates, str, client.engineClient(), simplifyName, collectionAccumulator, writeOptions, ingestIfNotExists, clientRequestProperties);
        } catch (Exception e) {
            client.cleanupIngestionByProducts(kustoCoordinates.database(), client.engineClient(), simplifyName, clientRequestProperties);
            throw e;
        }
    }

    public void ingestRowsIntoTempTbl(Iterator<InternalRow> iterator, String str, CollectionAccumulator<PartitionResult> collectionAccumulator, KustoWriteResource kustoWriteResource) {
        if (iterator.isEmpty()) {
            KustoDataSourceUtils$.MODULE$.logWarn(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(59).append("sink to Kusto table '").append(kustoWriteResource.coordinates().table().get()).append("' with no rows to write on partition ").append(TaskContext$.MODULE$.getPartitionId()).append(" ").append(str).toString());
        } else {
            ingestToTemporaryTableByWorkers(str, iterator, collectionAccumulator, kustoWriteResource);
        }
    }

    public void ingestRowsIntoKusto(Iterator<InternalRow> iterator, IngestClient ingestClient, CollectionAccumulator<PartitionResult> collectionAccumulator, String str, KustoWriteResource kustoWriteResource) {
        IngestionProperties ingestionProperties = getIngestionProperties(kustoWriteResource.writeOptions(), kustoWriteResource);
        ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv.name());
        ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.Table);
        ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FailuresAndSuccesses);
        ArrayList<Future<BoxedUnit>> ingestRows = ingestRows(iterator, kustoWriteResource, ingestClient, ingestionProperties, collectionAccumulator);
        KustoDataSourceUtils$.MODULE$.logInfo(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(48).append("Ingesting from blob - partition: ").append(TaskContext$.MODULE$.getPartitionId()).append(" requestId: '").append(kustoWriteResource.writeOptions().requestId()).append("' ").append(str).toString());
        ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(ingestRows).asScala()).foreach(future -> {
            $anonfun$ingestRowsIntoKusto$1(kustoWriteResource, future);
            return BoxedUnit.UNIT;
        });
    }

    private IngestionProperties getIngestionProperties(WriteOptions writeOptions, KustoWriteResource kustoWriteResource) {
        if (!writeOptions.IngestionProperties().isDefined()) {
            return new IngestionProperties(kustoWriteResource.coordinates().database(), kustoWriteResource.tmpTableName());
        }
        IngestionProperties ingestionProperties = SparkIngestionProperties$.MODULE$.fromString((String) writeOptions.IngestionProperties().get()).toIngestionProperties(kustoWriteResource.coordinates().database(), kustoWriteResource.tmpTableName());
        ingestionProperties.setIngestIfNotExists(new ArrayList());
        return ingestionProperties;
    }

    private void ingestToTemporaryTableByWorkers(String str, Iterator<InternalRow> iterator, CollectionAccumulator<PartitionResult> collectionAccumulator, KustoWriteResource kustoWriteResource) {
        KustoDataSourceUtils$.MODULE$.logInfo(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(40).append("Ingesting partition: '").append(TaskContext$.MODULE$.getPartitionId()).append("' in requestId: '").append(kustoWriteResource.writeOptions().requestId()).append("'").append(str).toString());
        ingestRowsIntoKusto(iterator, KustoClientCache$.MODULE$.getClient(kustoWriteResource.coordinates().clusterAlias(), kustoWriteResource.coordinates().clusterUrl(), kustoWriteResource.authentication()).ingestClient(), collectionAccumulator, str, kustoWriteResource);
    }

    public BlobWriteResource createBlobWriter(KustoCoordinates kustoCoordinates, String str, KustoClient kustoClient) {
        String sb = new StringBuilder(15).append(KustoQueryUtils$.MODULE$.simplifyName(kustoCoordinates.database())).append("_").append(str).append("_").append(UUID.randomUUID().toString()).append("_spark.csv.gz").toString();
        ContainerAndSas tempBlobForIngestion = kustoClient.getTempBlobForIngestion();
        CloudBlockBlob cloudBlockBlob = new CloudBlockBlob(new URI(new StringBuilder(0).append(tempBlobForIngestion.containerUrl()).append('/').append(sb).append(tempBlobForIngestion.sas()).toString()));
        String sas = tempBlobForIngestion.sas();
        BlobRequestOptions blobRequestOptions = new BlobRequestOptions();
        blobRequestOptions.setConcurrentRequestCount(Predef$.MODULE$.int2Integer(4));
        GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(cloudBlockBlob.openOutputStream((AccessCondition) null, blobRequestOptions, (OperationContext) null));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(gZIPOutputStream, StandardCharsets.UTF_8), GzipBufferSize());
        return new BlobWriteResource(bufferedWriter, gZIPOutputStream, new CountingWriter(bufferedWriter), cloudBlockBlob, sas);
    }

    public ArrayList<Future<BoxedUnit>> ingestRows(Iterator<InternalRow> iterator, KustoWriteResource kustoWriteResource, IngestClient ingestClient, IngestionProperties ingestionProperties, CollectionAccumulator<PartitionResult> collectionAccumulator) throws IOException {
        KustoClient client = KustoClientCache$.MODULE$.getClient(kustoWriteResource.coordinates().clusterAlias(), kustoWriteResource.coordinates().clusterUrl(), kustoWriteResource.authentication());
        int batchLimit = kustoWriteResource.writeOptions().batchLimit() * KustoConstants$.MODULE$.OneMegaByte();
        BlobWriteResource createBlobWriter = createBlobWriter(kustoWriteResource.coordinates(), kustoWriteResource.tmpTableName(), client);
        ZoneId zoneId = TimeZone.getTimeZone(kustoWriteResource.writeOptions().timeZone()).toZoneId();
        ArrayList<Future<BoxedUnit>> arrayList = new ArrayList<>();
        BlobWriteResource blobWriteResource = (BlobWriteResource) iterator.foldLeft(createBlobWriter, (blobWriteResource2, internalRow) -> {
            BlobWriteResource createBlobWriter2;
            Tuple2 tuple2 = new Tuple2(blobWriteResource2, internalRow);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            BlobWriteResource blobWriteResource2 = (BlobWriteResource) tuple2._1();
            RowCSVWriterUtils$.MODULE$.writeRowAsCSV((InternalRow) tuple2._2(), kustoWriteResource.schema(), zoneId, blobWriteResource2.csvWriter());
            long counter = blobWriteResource2.csvWriter().getCounter();
            if (counter < ((long) batchLimit)) {
                createBlobWriter2 = blobWriteResource2;
            } else {
                KustoDataSourceUtils$.MODULE$.logInfo(MODULE$.com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(70).append("Sealing blob in partition ").append(TaskContext$.MODULE$.getPartitionId()).append(" for requestId: '").append(kustoWriteResource.writeOptions().requestId()).append("', ").append("blob number ").append(arrayList.size()).append(", with size ").append(counter).toString());
                MODULE$.finalizeBlobWrite(blobWriteResource2);
                arrayList.add(ingest$1(blobWriteResource2.blob(), blobWriteResource2.csvWriter().getCounter(), blobWriteResource2.sas(), true, ingestionProperties, collectionAccumulator, ingestClient));
                createBlobWriter2 = MODULE$.createBlobWriter(kustoWriteResource.coordinates(), kustoWriteResource.tmpTableName(), client);
            }
            return createBlobWriter2;
        });
        KustoDataSourceUtils$.MODULE$.logInfo(com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(58).append("finished serializing rows in partition ").append(TaskContext$.MODULE$.getPartitionId()).append(" for requestId: '").append(kustoWriteResource.writeOptions().requestId()).append("' ").toString());
        finalizeBlobWrite(blobWriteResource);
        if (blobWriteResource.csvWriter().getCounter() > 0) {
            BoxesRunTime.boxToBoolean(arrayList.add(ingest$1(blobWriteResource.blob(), blobWriteResource.csvWriter().getCounter(), blobWriteResource.sas(), ingest$default$4$1(), ingestionProperties, collectionAccumulator, ingestClient)));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return arrayList;
    }

    public void finalizeBlobWrite(BlobWriteResource blobWriteResource) {
        blobWriteResource.writer().flush();
        blobWriteResource.gzip().flush();
        blobWriteResource.writer().close();
        blobWriteResource.gzip().close();
    }

    public static final /* synthetic */ String $anonfun$write$1(long j) {
        return new StringBuilder(8).append(",batch: ").append(BoxesRunTime.boxToLong(j).toString()).toString();
    }

    public static final /* synthetic */ String $anonfun$write$3(long j) {
        return new StringBuilder(1).append("_").append(BoxesRunTime.boxToLong(j).toString()).toString();
    }

    public static final /* synthetic */ void $anonfun$write$5(String str, CollectionAccumulator collectionAccumulator, KustoWriteResource kustoWriteResource, Iterator iterator) {
        MODULE$.ingestRowsIntoTempTbl(iterator, str, collectionAccumulator, kustoWriteResource);
    }

    public static final /* synthetic */ void $anonfun$write$8(String str, CollectionAccumulator collectionAccumulator, KustoWriteResource kustoWriteResource, Iterator iterator) {
        MODULE$.ingestRowsIntoTempTbl(iterator, str, collectionAccumulator, kustoWriteResource);
    }

    public static final /* synthetic */ void $anonfun$ingestRowsIntoKusto$1(KustoWriteResource kustoWriteResource, Future future) {
        try {
            Await$.MODULE$.result(future, KustoConstants$.MODULE$.DefaultIngestionTaskTime());
        } catch (TimeoutException unused) {
            KustoDataSourceUtils$.MODULE$.logWarn(MODULE$.com$microsoft$kusto$spark$datasink$KustoWriter$$myName(), new StringBuilder(85).append("Timed out trying to ingest requestId: '").append(kustoWriteResource.writeOptions().requestId()).append("', no need to fail as the ingest might succeed").toString());
        }
    }

    private static final Future ingest$1(CloudBlockBlob cloudBlockBlob, long j, String str, boolean z, IngestionProperties ingestionProperties, CollectionAccumulator collectionAccumulator, IngestClient ingestClient) {
        int partitionId = TaskContext$.MODULE$.getPartitionId();
        return Future$.MODULE$.apply(() -> {
            IngestionProperties ingestionProperties2 = ingestionProperties;
            if (!ingestionProperties.getFlushImmediately() && z) {
                ingestionProperties2 = SparkIngestionProperties$.MODULE$.cloneIngestionProperties(ingestionProperties);
                ingestionProperties2.setFlushImmediately(true);
            }
            collectionAccumulator.add(new PartitionResult(ingestClient.ingestFromBlob(new BlobSourceInfo(new StringBuilder(0).append(cloudBlockBlob.getStorageUri().getPrimaryUri().toString()).append(str).toString(), j), ingestionProperties2), partitionId));
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    private static final boolean ingest$default$4$1() {
        return false;
    }

    private KustoWriter$() {
        MODULE$ = this;
        this.com$microsoft$kusto$spark$datasink$KustoWriter$$myName = getClass().getSimpleName();
        this.LegacyTempIngestionTablePrefix = "_tmpTable";
        this.TempIngestionTablePrefix = "sparkTempTable_";
        this.DelayPeriodBetweenCalls = (int) KustoConstants$.MODULE$.DefaultPeriodicSamplePeriod().toMillis();
        this.GzipBufferSize = 1000 * KustoConstants$.MODULE$.DefaultBufferSize();
    }
}
