package com.microsoft.azure.kusto.ingest;

import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.models.BlobStorageException;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ClientRequestProperties;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.StreamingClient;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/kusto/ingest/StreamingIngestClient.class */
public class StreamingIngestClient extends IngestClientBase implements IngestClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int STREAM_COMPRESS_BUFFER_SIZE = 16384;
    private static final String STREAMING_INGEST_CLIENT = "StreamingIngestClient";
    private final StreamingClient streamingClient;
    String connectionDataSource;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable HttpClientProperties httpClientProperties) throws URISyntaxException {
        log.info("Creating a new StreamingIngestClient");
        ConnectionStringBuilder connectionStringBuilder2 = new ConnectionStringBuilder(connectionStringBuilder);
        connectionStringBuilder2.setClusterUrl(getQueryEndpoint(connectionStringBuilder2.getClusterUrl()));
        this.streamingClient = ClientFactory.createStreamingClient(connectionStringBuilder2, httpClientProperties);
        this.connectionDataSource = connectionStringBuilder2.getClusterUrl();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingIngestClient(ConnectionStringBuilder connectionStringBuilder, @Nullable CloseableHttpClient closeableHttpClient) throws URISyntaxException {
        log.info("Creating a new StreamingIngestClient");
        this.streamingClient = ClientFactory.createStreamingClient(connectionStringBuilder, closeableHttpClient);
        this.connectionDataSource = connectionStringBuilder.getClusterUrl();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingIngestClient(StreamingClient streamingClient) {
        log.info("Creating a new StreamingIngestClient");
        this.streamingClient = streamingClient;
    }

    public static String generateEngineUriSuggestion(URIBuilder uRIBuilder) {
        if (!uRIBuilder.getHost().toLowerCase().startsWith("ingest-")) {
            throw new IllegalArgumentException("The URL is already formatted as the suggested Engine endpoint, so no suggestion can be made");
        }
        uRIBuilder.setHost(uRIBuilder.getHost().substring("ingest-".length()));
        return uRIBuilder.toString();
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(fileSourceInfo, "fileSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        fileSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromStream(IngestionUtils.fileToStream(fileSourceInfo, false), ingestionProperties);
        } catch (FileNotFoundException e) {
            log.error("File not found when ingesting a file.", (Throwable) e);
            throw new IngestionClientException("IO exception - check file path.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        blobSourceInfo.validate();
        ingestionProperties.validate();
        try {
            return ingestFromBlob(blobSourceInfo, ingestionProperties, new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath()).buildClient(), null);
        } catch (BlobStorageException e) {
            log.error("Unexpected Storage error when ingesting a blob.", (Throwable) e);
            throw new IngestionClientException("Unexpected Storage error when ingesting a blob.", e);
        } catch (IllegalArgumentException e2) {
            log.error("Unexpected error when ingesting a blob - Invalid blob path.", (Throwable) e2);
            throw new IngestionClientException("Unexpected error when ingesting a blob - Invalid blob path.", e2);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromResultSetImpl(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(resultSetSourceInfo, "resultSetSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        resultSetSourceInfo.validate();
        ingestionProperties.validateResultSetProperties();
        try {
            return ingestFromStream(IngestionUtils.resultSetToStream(resultSetSourceInfo), ingestionProperties);
        } catch (IOException e) {
            log.error("Failed to read from ResultSet.", (Throwable) e);
            throw new IngestionClientException("Failed to read from ResultSet.", e);
        }
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException {
        return ingestFromStreamImpl(streamSourceInfo, ingestionProperties, null);
    }

    @Override // com.microsoft.azure.kusto.ingest.IngestClientBase
    protected String getClientType() {
        return STREAMING_INGEST_CLIENT;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String str) throws IngestionClientException, IngestionServiceException {
        return (IngestionResult) MonitoredActivity.invoke(() -> {
            return ingestFromStreamImpl(streamSourceInfo, ingestionProperties, str);
        }, getClientType().concat(".ingestFromStream"), getIngestionTraceAttributes(streamSourceInfo, ingestionProperties));
    }

    private IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String str) throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
        IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
        streamSourceInfo.validate();
        ingestionProperties.validate();
        ClientRequestProperties clientRequestProperties = null;
        if (StringUtils.isNotBlank(str)) {
            clientRequestProperties = new ClientRequestProperties();
            clientRequestProperties.setClientRequestId(str);
        }
        try {
            InputStream compressStream = IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat) ? compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen()) : streamSourceInfo.getStream();
            log.debug("Executing streaming ingest");
            this.streamingClient.executeStreamingIngest(ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), compressStream, clientRequestProperties, dataFormat.getKustoValue(), ingestionProperties.getIngestionMapping().getIngestionMappingReference(), streamSourceInfo.getCompressionType() != null && streamSourceInfo.isLeaveOpen());
            log.debug("Stream was ingested successfully.");
            IngestionStatus ingestionStatus = new IngestionStatus();
            ingestionStatus.status = OperationStatus.Succeeded;
            ingestionStatus.table = ingestionProperties.getTableName();
            ingestionStatus.database = ingestionProperties.getDatabaseName();
            return new IngestionStatusResult(ingestionStatus);
        } catch (DataClientException | IOException e) {
            log.error(e.getMessage(), (Throwable) e);
            throw new IngestionClientException(e.getMessage(), e);
        } catch (DataServiceException e2) {
            log.error(e2.getMessage(), (Throwable) e2);
            throw new IngestionServiceException(e2.getMessage(), e2);
        }
    }

    private InputStream compressStream(InputStream inputStream, boolean z) throws IngestionClientException, IOException {
        int read;
        log.debug("Compressing the stream.");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream);
        byte[] bArr = new byte[16384];
        int read2 = inputStream.read(bArr);
        if (read2 == -1) {
            log.error("Empty stream.");
            throw new IngestionClientException("Empty stream.");
        }
        do {
            gZIPOutputStream.write(bArr, 0, read2);
            read = inputStream.read(bArr);
            read2 = read;
        } while (read != -1);
        gZIPOutputStream.flush();
        gZIPOutputStream.close();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
        byteArrayOutputStream.close();
        if (!z) {
            inputStream.close();
        }
        return byteArrayInputStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient blobClient, @Nullable String str) throws IngestionClientException, IngestionServiceException {
        return (IngestionResult) MonitoredActivity.invoke(() -> {
            return ingestFromBlobImpl(blobSourceInfo, ingestionProperties, blobClient, str);
        }, getClientType().concat(".ingestFromBlob"), getIngestionTraceAttributes(blobSourceInfo, ingestionProperties));
    }

    private IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties, BlobClient blobClient, @Nullable String str) throws IngestionClientException, IngestionServiceException {
        String blobPath = blobSourceInfo.getBlobPath();
        try {
            if (blobSourceInfo.getRawSizeInBytes() == 0 && blobClient.getProperties().getBlobSize() == 0) {
                log.error("Empty blob.");
                throw new IngestionClientException("Empty blob.");
            }
            ClientRequestProperties clientRequestProperties = null;
            if (StringUtils.isNotBlank(str)) {
                clientRequestProperties = new ClientRequestProperties();
                clientRequestProperties.setClientRequestId(str);
            }
            try {
                this.streamingClient.executeStreamingIngestFromBlob(ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), blobPath, clientRequestProperties, ingestionProperties.getDataFormat().getKustoValue(), ingestionProperties.getIngestionMapping().getIngestionMappingReference());
                log.debug("Blob was ingested successfully.");
                IngestionStatus ingestionStatus = new IngestionStatus();
                ingestionStatus.status = OperationStatus.Succeeded;
                ingestionStatus.table = ingestionProperties.getTableName();
                ingestionStatus.database = ingestionProperties.getDatabaseName();
                return new IngestionStatusResult(ingestionStatus);
            } catch (DataClientException e) {
                log.error(e.getMessage(), (Throwable) e);
                throw new IngestionClientException(e.getMessage(), e);
            } catch (DataServiceException e2) {
                log.error(e2.getMessage(), (Throwable) e2);
                throw new IngestionServiceException(e2.getMessage(), e2);
            }
        } catch (BlobStorageException e3) {
            Object[] objArr = new Object[1];
            objArr[0] = e3.getStatusCode() == 403 ? "this might mean the blob doesn't exist" : "";
            throw new IngestionClientException(String.format("Exception trying to read blob metadata,%s", objArr), e3);
        }
    }

    protected void setConnectionDataSource(String str) {
        this.connectionDataSource = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }
}
