package com.microsoft.azure.kusto.ingest;

import com.azure.core.http.HttpClient;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.models.BlobStorageException;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.DataWebException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.SourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.kusto.ingest.utils.ExponentialRetry;
import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.util.UUID;
import org.apache.http.impl.client.CloseableHttpClient;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.class */
public class ManagedStreamingIngestClient extends IngestClientBase implements IngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final int ATTEMPT_COUNT = 3;
    public static final int MAX_STREAMING_SIZE_BYTES = 4194304;
    public static final String MANAGED_STREAMING_INGEST_CLIENT = "ManagedStreamingIngestClient";
    final QueuedIngestClientImpl queuedIngestClient;
    final StreamingIngestClient streamingIngestClient;
    private final ExponentialRetry exponentialRetryTemplate;
    private CloseableHttpClient httpClient;

    public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder connectionStringBuilder) throws URISyntaxException {
        return fromDmConnectionString(connectionStringBuilder, (HttpClientProperties) null);
    }

    public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStringBuilder connectionStringBuilder, @Nullable HttpClientProperties httpClientProperties) throws URISyntaxException {
        ConnectionStringBuilder connectionStringBuilder2 = new ConnectionStringBuilder(connectionStringBuilder);
        connectionStringBuilder2.setClusterUrl(IngestClientBase.getQueryEndpoint(connectionStringBuilder2.getClusterUrl()));
        return new ManagedStreamingIngestClient(connectionStringBuilder, connectionStringBuilder2, httpClientProperties);
    }

    public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder connectionStringBuilder) throws URISyntaxException {
        return fromEngineConnectionString(connectionStringBuilder, null);
    }

    public static ManagedStreamingIngestClient fromEngineConnectionString(ConnectionStringBuilder connectionStringBuilder, @Nullable HttpClientProperties httpClientProperties) throws URISyntaxException {
        ConnectionStringBuilder connectionStringBuilder2 = new ConnectionStringBuilder(connectionStringBuilder);
        connectionStringBuilder2.setClusterUrl(IngestClientBase.getIngestionEndpoint(connectionStringBuilder.getClusterUrl()));
        return new ManagedStreamingIngestClient(connectionStringBuilder2, connectionStringBuilder, httpClientProperties);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, ConnectionStringBuilder connectionStringBuilder2) throws URISyntaxException {
        this(connectionStringBuilder, connectionStringBuilder2, (HttpClientProperties) null);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, ConnectionStringBuilder connectionStringBuilder2, @Nullable HttpClientProperties httpClientProperties) throws URISyntaxException {
        this.httpClient = null;
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClientProperties);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder2, httpClientProperties);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable HttpClientProperties httpClientProperties) throws URISyntaxException {
        this.httpClient = null;
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, httpClientProperties);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, httpClientProperties);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable CloseableHttpClient closeableHttpClient) throws URISyntaxException {
        this.httpClient = null;
        log.info("Creating a new ManagedStreamingIngestClient from connection strings");
        this.queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, closeableHttpClient);
        this.streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, closeableHttpClient);
        this.httpClient = closeableHttpClient;
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    public ManagedStreamingIngestClient(ResourceManager resourceManager, AzureStorageClient azureStorageClient, StreamingClient streamingClient) {
        this.httpClient = null;
        log.info("Creating a new ManagedStreamingIngestClient from raw parts");
        this.queuedIngestClient = new QueuedIngestClientImpl(resourceManager, azureStorageClient);
        this.streamingIngestClient = new StreamingIngestClient(streamingClient);
        this.exponentialRetryTemplate = new ExponentialRetry(3);
    }

    ManagedStreamingIngestClient(ResourceManager resourceManager, AzureStorageClient azureStorageClient, StreamingClient streamingClient, ExponentialRetry exponentialRetry) {
        this.httpClient = null;
        log.info("Creating a new ManagedStreamingIngestClient from raw parts");
        this.queuedIngestClient = new QueuedIngestClientImpl(resourceManager, azureStorageClient);
        this.streamingIngestClient = new StreamingIngestClient(streamingClient);
        this.exponentialRetryTemplate = exponentialRetry;
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromStream(IngestionUtils.fileToStream(fileSourceInfo, true), ingestionProperties);
        } catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", (Throwable) e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        BlobClientBuilder endpoint = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath());
        if (this.httpClient != null) {
            endpoint.httpClient((HttpClient) this.httpClient);
        }
        BlobClient buildClient = endpoint.buildClient();
        if (blobSourceInfo.getRawSizeInBytes() <= 0) {
            try {
                blobSourceInfo.setRawSizeInBytes(buildClient.getProperties().getBlobSize());
            } catch (BlobStorageException e) {
                throw new IngestionServiceException(blobSourceInfo.getBlobPath(), "Failed getting blob properties: " + e.getMessage(), e);
            }
        }
        if (blobSourceInfo.getRawSizeInBytes() > 4194304) {
            log.info("Blob size is greater than max streaming size ({} bytes). Falling back to queued.", Long.valueOf(blobSourceInfo.getRawSizeInBytes()));
            return this.queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
        }
        IngestionResult streamWithRetries = streamWithRetries(blobSourceInfo, ingestionProperties, buildClient);
        return streamWithRetries != null ? streamWithRetries : this.queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
    }

    private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient) throws IngestionClientException, IngestionServiceException {
        return (IngestionResult) new ExponentialRetry(this.exponentialRetryTemplate).execute(num -> {
            try {
                return blobClient != null ? this.streamingIngestClient.ingestFromBlob((BlobSourceInfo) sourceInfo, ingestionProperties, blobClient, String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), num)) : this.streamingIngestClient.ingestFromStream((StreamSourceInfo) sourceInfo, ingestionProperties, String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(), num));
            } catch (Exception e) {
                if ((e instanceof IngestionServiceException) && e.getCause() != null && (e.getCause() instanceof DataServiceException) && e.getCause().getCause() != null && (e.getCause().getCause() instanceof DataWebException) && ((DataWebException) e.getCause().getCause()).getApiError().isPermanent()) {
                    throw e;
                }
                log.info(String.format("Streaming ingestion failed attempt %d", num), (Throwable) e);
                if (!(sourceInfo instanceof StreamSourceInfo)) {
                    return null;
                }
                try {
                    ((StreamSourceInfo) sourceInfo).getStream().reset();
                    return null;
                } catch (IOException e2) {
                    throw new IngestionClientException("Failed to reset stream", e2);
                }
            }
        });
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validateResultSetProperties();
        try {
            return ingestFromStream(IngestionUtils.resultSetToStream(resultSetSourceInfo), ingestionProperties);
        } catch (IOException e) {
            log.error("Failed to read from ResultSet.", (Throwable) e);
            throw new IngestionClientException("Failed to read from ResultSet.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        streamSourceInfo.validate();
        ingestionProperties.validate();
        UUID sourceId = streamSourceInfo.getSourceId();
        if (sourceId == null) {
            sourceId = UUID.randomUUID();
        }
        try {
            byte[] readBytesFromInputStream = IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(), 4194305);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(readBytesFromInputStream);
            if (readBytesFromInputStream.length > 4194304) {
                log.info("Stream size is greater than max streaming size ({} bytes). Falling back to queued.", Integer.valueOf(readBytesFromInputStream.length));
                return this.queuedIngestClient.ingestFromStream(new StreamSourceInfo(new SequenceInputStream(byteArrayInputStream, streamSourceInfo.getStream()), streamSourceInfo.isLeaveOpen(), sourceId, streamSourceInfo.getCompressionType()), ingestionProperties);
            }
            if (!streamSourceInfo.isLeaveOpen()) {
                try {
                    streamSourceInfo.getStream().close();
                } catch (IOException e) {
                    log.warn("Failed to close stream", (Throwable) e);
                }
            }
            StreamSourceInfo streamSourceInfo2 = new StreamSourceInfo(byteArrayInputStream, true, sourceId, streamSourceInfo.getCompressionType());
            try {
                IngestionResult streamWithRetries = streamWithRetries(streamSourceInfo2, ingestionProperties, null);
                if (streamWithRetries != null) {
                    return streamWithRetries;
                }
                IngestionResult ingestFromStream = this.queuedIngestClient.ingestFromStream(streamSourceInfo2, ingestionProperties);
                try {
                    streamSourceInfo2.getStream().close();
                } catch (IOException e2) {
                    log.warn("Failed to close byte stream", (Throwable) e2);
                }
                return ingestFromStream;
            } finally {
                try {
                    streamSourceInfo2.getStream().close();
                } catch (IOException e3) {
                    log.warn("Failed to close byte stream", (Throwable) e3);
                }
            }
        } catch (IOException e4) {
            throw new IngestionClientException("Failed to read from stream.", e4);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected String getClientType() {
        return MANAGED_STREAMING_INGEST_CLIENT;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.queuedIngestClient.close();
        this.streamingIngestClient.close();
    }
}
