package com.microsoft.kusto.spark.datasource;

import com.azure.core.credential.AzureSasCredential;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.microsoft.azure.kusto.data.ClientRequestProperties;
import com.microsoft.azure.storage.blob.BlobConstants;
import com.microsoft.kusto.spark.utils.ExtendedKustoClient;
import com.microsoft.kusto.spark.utils.KustoAzureFsSetupCache$;
import com.microsoft.kusto.spark.utils.KustoDataSourceUtils$;
import java.security.InvalidParameterException;
import java.time.Clock;
import java.time.Instant;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partition;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.concurrent.Map;
import scala.collection.concurrent.TrieMap;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.WrappedArray;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KustoReader.scala */
/* loaded from: input_file:com/microsoft/kusto/spark/datasource/KustoReader$.class */
public final class KustoReader$ {
    public static KustoReader$ MODULE$;
    private final String className;
    private final Map<DistributedReadModeTransientCacheKey, Seq<String>> distributedReadModeTransientCache;
    private final String com$microsoft$kusto$spark$datasource$KustoReader$$minimalParquetWriterVersion;

    static {
        new KustoReader$();
    }

    private String className() {
        return this.className;
    }

    private Map<DistributedReadModeTransientCacheKey, Seq<String>> distributedReadModeTransientCache() {
        return this.distributedReadModeTransientCache;
    }

    public String com$microsoft$kusto$spark$datasource$KustoReader$$minimalParquetWriterVersion() {
        return this.com$microsoft$kusto$spark$datasource$KustoReader$$minimalParquetWriterVersion;
    }

    public RDD<Row> singleBuildScan(ExtendedKustoClient extendedKustoClient, KustoReadRequest kustoReadRequest, KustoFiltering kustoFiltering) {
        KustoDataSourceUtils$.MODULE$.logInfo(className(), new StringBuilder(43).append("Executing query in Single mode. requestId: ").append(kustoReadRequest.requestId()).toString());
        KustoResponseDeserializer apply = KustoResponseDeserializer$.MODULE$.apply(extendedKustoClient.executeEngine(kustoReadRequest.kustoCoordinates().database(), KustoFilter$.MODULE$.pruneAndFilter(new KustoSchema(kustoReadRequest.schema().sparkSchema(), (Set) Predef$.MODULE$.Set().apply(Nil$.MODULE$)), kustoReadRequest.query(), kustoFiltering), (ClientRequestProperties) kustoReadRequest.clientRequestProperties().orNull(Predef$.MODULE$.$conforms()), extendedKustoClient.executeEngine$default$4()).getPrimaryResults());
        return kustoReadRequest.sparkSession().createDataFrame(apply.toRows(), apply.getSchema().sparkSchema()).rdd();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v41, types: [scala.collection.Seq] */
    public RDD<Row> distributedBuildScan(ExtendedKustoClient extendedKustoClient, KustoReadRequest kustoReadRequest, TransientStorageParameters transientStorageParameters, KustoReadOptions kustoReadOptions, KustoFiltering kustoFiltering) {
        WrappedArray wrapRefArray;
        RDD<Row> rdd;
        Nil$ nil$ = Nil$.MODULE$;
        if (kustoReadOptions.distributedReadModeTransientCacheEnabled()) {
            DistributedReadModeTransientCacheKey distributedReadModeTransientCacheKey = new DistributedReadModeTransientCacheKey(kustoReadRequest.query(), kustoReadRequest.kustoCoordinates(), kustoReadRequest.authentication());
            if (distributedReadModeTransientCache().contains(distributedReadModeTransientCacheKey)) {
                KustoDataSourceUtils$.MODULE$.logInfo(className(), "Fetching from distributedReadModeTransientCache: hit, reusing cached export paths");
                wrapRefArray = (Seq) distributedReadModeTransientCache().mo3260apply((Map<DistributedReadModeTransientCacheKey, Seq<String>>) distributedReadModeTransientCacheKey);
            } else {
                KustoDataSourceUtils$.MODULE$.logInfo(className(), "distributedReadModeTransientCache: miss, exporting to cache paths");
                wrapRefArray = Predef$.MODULE$.wrapRefArray(exportToStorage(extendedKustoClient, kustoReadRequest, transientStorageParameters, kustoReadOptions, determineFilterPushDown$1(kustoReadOptions.queryFilterPushDown(), false, kustoFiltering)));
                distributedReadModeTransientCache().update(distributedReadModeTransientCacheKey, wrapRefArray);
            }
        } else {
            wrapRefArray = Predef$.MODULE$.wrapRefArray(exportToStorage(extendedKustoClient, kustoReadRequest, transientStorageParameters, kustoReadOptions, determineFilterPushDown$1(kustoReadOptions.queryFilterPushDown(), true, kustoFiltering)));
        }
        try {
            rdd = kustoReadRequest.sparkSession().read().parquet(wrapRefArray).rdd();
        } catch (Exception e) {
            if (KustoDataSourceUtils$.MODULE$.countRows(extendedKustoClient.engineClient(), KustoFilter$.MODULE$.pruneAndFilter(kustoReadRequest.schema(), kustoReadRequest.query(), kustoFiltering), kustoReadRequest.kustoCoordinates().database(), (ClientRequestProperties) kustoReadRequest.clientRequestProperties().orNull(Predef$.MODULE$.$conforms())) != 0) {
                throw e;
            }
            rdd = kustoReadRequest.sparkSession().emptyDataFrame().rdd();
        }
        RDD<Row> rdd2 = rdd;
        KustoDataSourceUtils$.MODULE$.logInfo(className(), new StringBuilder(47).append("Transaction data read from blob storage, paths:").append(wrapRefArray).toString());
        return rdd2;
    }

    private String[] exportToStorage(ExtendedKustoClient extendedKustoClient, KustoReadRequest kustoReadRequest, TransientStorageParameters transientStorageParameters, KustoReadOptions kustoReadOptions, KustoFiltering kustoFiltering) {
        KustoDataSourceUtils$.MODULE$.logInfo(className(), new StringBuilder(83).append("Starting exporting data from Kusto to blob storage in Distributed mode. requestId: ").append(kustoReadRequest.requestId()).toString());
        setupBlobAccess(kustoReadRequest, transientStorageParameters);
        Partition[] calculatePartitions = calculatePartitions(kustoReadOptions.partitionOptions());
        KustoReader kustoReader = new KustoReader(extendedKustoClient);
        String replaceAll = new StringBuilder(5).append(kustoReadRequest.kustoCoordinates().database()).append("/dir").append(UUID.randomUUID()).append(BlobConstants.DEFAULT_DELIMITER).toString().replaceAll("[^0-9a-zA-Z/]", "_");
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(calculatePartitions)).foreach(partition -> {
            $anonfun$exportToStorage$1(kustoReader, kustoReadRequest, transientStorageParameters, replaceAll, kustoReadOptions, kustoFiltering, partition);
            return BoxedUnit.UNIT;
        });
        String[] strArr = (String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(transientStorageParameters.storageCredentials())).filter(transientStorageCredentials -> {
            return BoxesRunTime.boxToBoolean($anonfun$exportToStorage$2(transientStorageParameters, replaceAll, transientStorageCredentials));
        }))).map(transientStorageCredentials2 -> {
            return new StringBuilder(16).append("wasbs://").append(transientStorageCredentials2.blobContainer()).append("@").append(transientStorageCredentials2.storageAccountName()).append(".blob.").append(transientStorageParameters.endpointSuffix()).append(BlobConstants.DEFAULT_DELIMITER).append(replaceAll).toString();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
        KustoDataSourceUtils$.MODULE$.logInfo(className(), new StringBuilder(81).append("Finished exporting from Kusto to ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).mkString(",")).append(", on requestId: ").append(kustoReadRequest.requestId()).append(", will start parquet reading now").toString());
        return strArr;
    }

    public void setupBlobAccess(KustoReadRequest kustoReadRequest, TransientStorageParameters transientStorageParameters) {
        Configuration hadoopConfiguration = kustoReadRequest.sparkSession().sparkContext().hadoopConfiguration();
        Instant now = Instant.now(Clock.systemUTC());
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(transientStorageParameters.storageCredentials())).foreach(transientStorageCredentials -> {
            $anonfun$setupBlobAccess$1(now, hadoopConfiguration, transientStorageParameters, transientStorageCredentials);
            return BoxedUnit.UNIT;
        });
    }

    private Partition[] calculatePartitions(PartitionOptions partitionOptions) {
        if ("hash".equals(partitionOptions.mode().get())) {
            return calculateHashPartitions(partitionOptions);
        }
        throw new InvalidParameterException(new StringBuilder(33).append("Partitioning mode '").append(partitionOptions.mode()).append("' is not valid").toString());
    }

    private Partition[] calculateHashPartitions(PartitionOptions partitionOptions) {
        if (partitionOptions.amount() <= 1) {
            new Partition[1][0] = new KustoPartition(None$.MODULE$, 0);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        Partition[] partitionArr = new Partition[partitionOptions.amount()];
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), partitionOptions.amount()).foreach$mVc$sp(i -> {
            Option<String> column = partitionOptions.column();
            if (column instanceof Some) {
                partitionArr[i] = new KustoPartition(new Some(new StringBuilder(13).append(" hash(").append((String) ((Some) column).value()).append(", ").append(partitionOptions.amount()).append(") == ").append(i).toString()), i);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else {
                if (!None$.MODULE$.equals(column)) {
                    throw new MatchError(column);
                }
                KustoDataSourceUtils$.MODULE$.logWarn(MODULE$.className(), "Column name is empty when requesting for export");
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            }
        });
        return partitionArr;
    }

    private final KustoFiltering determineFilterPushDown$1(Option option, boolean z, KustoFiltering kustoFiltering) {
        if (BoxesRunTime.unboxToBoolean(option.getOrElse(() -> {
            return z;
        }))) {
            KustoDataSourceUtils$.MODULE$.logInfo(className(), new StringBuilder(6).append("using ").append(KustoSourceOptions$.MODULE$.KUSTO_QUERY_FILTER_PUSH_DOWN()).toString());
            return kustoFiltering;
        }
        KustoDataSourceUtils$.MODULE$.logInfo(className(), new StringBuilder(10).append("not using ").append(KustoSourceOptions$.MODULE$.KUSTO_QUERY_FILTER_PUSH_DOWN()).toString());
        return new KustoFiltering(KustoFiltering$.MODULE$.apply$default$1(), KustoFiltering$.MODULE$.apply$default$2());
    }

    public static final /* synthetic */ void $anonfun$exportToStorage$1(KustoReader kustoReader, KustoReadRequest kustoReadRequest, TransientStorageParameters transientStorageParameters, String str, KustoReadOptions kustoReadOptions, KustoFiltering kustoFiltering, Partition partition) {
        kustoReader.exportPartitionToBlob((KustoPartition) partition, kustoReadRequest, transientStorageParameters, str, kustoReadOptions, kustoFiltering);
    }

    public static final /* synthetic */ boolean $anonfun$exportToStorage$2(TransientStorageParameters transientStorageParameters, String str, TransientStorageCredentials transientStorageCredentials) {
        BlobContainerClient buildClient;
        String sb = new StringBuilder(14).append("https://").append(transientStorageCredentials.storageAccountName()).append(".blob.").append(transientStorageParameters.endpointSuffix()).toString();
        if (transientStorageCredentials.sasDefined()) {
            buildClient = new BlobContainerClientBuilder().endpoint(sb).containerName(transientStorageCredentials.blobContainer()).credential(new AzureSasCredential(StringOps$.MODULE$.apply$extension(Predef$.MODULE$.augmentString(transientStorageCredentials.sasKey()), 0) == '?' ? transientStorageCredentials.sasKey() : new StringBuilder(1).append("?").append(transientStorageCredentials.sasKey()).toString())).buildClient();
        } else {
            buildClient = new BlobContainerClientBuilder().endpoint(sb).containerName(transientStorageCredentials.blobContainer()).credential(new StorageSharedKeyCredential(transientStorageCredentials.storageAccountName(), transientStorageCredentials.storageAccountKey())).buildClient();
        }
        return buildClient.listBlobsByHierarchy(str).stream().count() > 0;
    }

    public static final /* synthetic */ void $anonfun$setupBlobAccess$1(Instant instant, Configuration configuration, TransientStorageParameters transientStorageParameters, TransientStorageCredentials transientStorageCredentials) {
        if (transientStorageCredentials.sasDefined()) {
            if (!KustoAzureFsSetupCache$.MODULE$.updateAndGetPrevSas(transientStorageCredentials.blobContainer(), transientStorageCredentials.storageAccountName(), transientStorageCredentials.sasKey(), instant)) {
                configuration.set(new StringBuilder(20).append("fs.azure.sas.").append(transientStorageCredentials.blobContainer()).append(".").append(transientStorageCredentials.storageAccountName()).append(".blob.").append(transientStorageParameters.endpointSuffix()).toString(), String.valueOf(transientStorageCredentials.sasKey()));
            }
        } else if (!KustoAzureFsSetupCache$.MODULE$.updateAndGetPrevStorageAccountAccess(transientStorageCredentials.storageAccountName(), transientStorageCredentials.storageAccountKey(), instant)) {
            configuration.set(new StringBuilder(27).append("fs.azure.account.key.").append(transientStorageCredentials.storageAccountName()).append(".blob.").append(transientStorageParameters.endpointSuffix()).toString(), String.valueOf(transientStorageCredentials.storageAccountKey()));
        }
        if (KustoAzureFsSetupCache$.MODULE$.updateAndGetPrevNativeAzureFs(instant)) {
            return;
        }
        configuration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
    }

    private KustoReader$() {
        MODULE$ = this;
        this.className = getClass().getSimpleName();
        this.distributedReadModeTransientCache = new TrieMap();
        this.com$microsoft$kusto$spark$datasource$KustoReader$$minimalParquetWriterVersion = "3.3.0";
    }
}
