package com.microsoft.azure.flink.writer.internal;

import com.microsoft.azure.flink.common.KustoClientUtil;
import com.microsoft.azure.flink.common.KustoRetryConfig;
import com.microsoft.azure.flink.common.KustoRetryUtil;
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas;
import io.github.resilience4j.retry.Retry;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
@Internal
/* loaded from: input_file:com/microsoft/azure/flink/writer/internal/ContainerProvider.class */
public class ContainerProvider implements Serializable {
    private static final long serialVersionUID = 1;
    private final Random randomGenerator = new Random();
    private long expirationTimestamp;
    private final KustoConnectionOptions connectionOptions;
    private final KustoRetryConfig kustoRetryConfig;
    private static ContainerProvider containerProviderInstance;
    private final transient Retry retry;
    private static final Logger LOG = LoggerFactory.getLogger(ContainerProvider.class);
    private static final List<ContainerWithSas> CONTAINER_SAS = Collections.synchronizedList(new ArrayList());

    /* loaded from: input_file:com/microsoft/azure/flink/writer/internal/ContainerProvider$Builder.class */
    public static class Builder {
        private final KustoConnectionOptions connectionOptions;
        private final KustoRetryConfig kustoRetryConfig;

        public Builder(@NotNull KustoConnectionOptions kustoConnectionOptions) {
            this.connectionOptions = kustoConnectionOptions;
            this.kustoRetryConfig = kustoConnectionOptions.getKustoRetryConfig();
        }

        public ContainerProvider build() {
            return ContainerProvider.build(this);
        }
    }

    private ContainerProvider(KustoConnectionOptions kustoConnectionOptions, KustoRetryConfig kustoRetryConfig) {
        this.connectionOptions = kustoConnectionOptions;
        this.kustoRetryConfig = kustoRetryConfig;
        this.retry = KustoRetryUtil.getRetries(kustoRetryConfig);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static synchronized ContainerProvider build(Builder builder) {
        if (containerProviderInstance == null) {
            containerProviderInstance = new ContainerProvider((KustoConnectionOptions) Preconditions.checkNotNull(builder.connectionOptions), (KustoRetryConfig) Preconditions.checkNotNull(builder.kustoRetryConfig));
        }
        return containerProviderInstance;
    }

    public ContainerWithSas getBlobContainer() {
        if (!isCacheExpired()) {
            return (ContainerWithSas) this.retry.executeSupplier(getContainerSupplier());
        }
        int nextInt = this.randomGenerator.nextInt(CONTAINER_SAS.size());
        LOG.info("Returning storage from cache {}", CONTAINER_SAS.get(nextInt).getEndpointWithoutSas());
        return CONTAINER_SAS.get(nextInt);
    }

    private boolean isCacheExpired() {
        return this.expirationTimestamp >= Instant.now(Clock.systemUTC()).toEpochMilli() && !CONTAINER_SAS.isEmpty();
    }

    @Contract(pure = true)
    @NotNull
    private Supplier<ContainerWithSas> getContainerSupplier() {
        return () -> {
            try {
                QueuedIngestClient createDMClient = KustoClientUtil.createDMClient((KustoConnectionOptions) Preconditions.checkNotNull(this.connectionOptions, "Connection options passed to DM client cannot be null."), ContainerProvider.class.getSimpleName());
                Throwable th = null;
                try {
                    CONTAINER_SAS.clear();
                    this.expirationTimestamp = Instant.now(Clock.systemUTC()).plus(this.kustoRetryConfig.getCacheExpirationSeconds(), (TemporalUnit) ChronoUnit.SECONDS).toEpochMilli();
                    LOG.info("Setting expiration timestamp to {}", Long.valueOf(this.expirationTimestamp));
                    createDMClient.getResourceManager().getShuffledContainers().forEach(containerWithSas -> {
                        LOG.debug("Adding container post refresh {}", containerWithSas.getEndpointWithoutSas());
                        CONTAINER_SAS.add(containerWithSas);
                    });
                    ContainerWithSas containerWithSas2 = CONTAINER_SAS.get(this.randomGenerator.nextInt(CONTAINER_SAS.size()));
                    if (createDMClient != null) {
                        if (0 != 0) {
                            try {
                                createDMClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createDMClient.close();
                        }
                    }
                    return containerWithSas2;
                } catch (Throwable th3) {
                    if (createDMClient != null) {
                        if (0 != 0) {
                            try {
                                createDMClient.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createDMClient.close();
                        }
                    }
                    throw th3;
                }
            } catch (IOException | URISyntaxException | IngestionClientException | IngestionServiceException e) {
                LOG.error("Failed to get temp storage container", e);
                if (CONTAINER_SAS.isEmpty()) {
                    throw new RuntimeException(e);
                }
                LOG.warn("Failed to get temp storage container. To cover for transient failures, returning an existing container.If the SAS keys have expired, the flow will fail further in the flow", e);
                return CONTAINER_SAS.get(this.randomGenerator.nextInt(CONTAINER_SAS.size()));
            }
        };
    }

    public long getExpirationTimestamp() {
        return this.expirationTimestamp;
    }
}
