package com.microsoft.azure.flink.writer.internal.sink;

import com.microsoft.azure.flink.common.KustoClientUtil;
import com.microsoft.azure.flink.common.KustoRetryConfig;
import com.microsoft.azure.flink.common.KustoRetryUtil;
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.kusto.ingest.ColumnMapping;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import java.net.URISyntaxException;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Product;

@Internal
/* loaded from: input_file:com/microsoft/azure/flink/writer/internal/sink/KustoSinkCommon.class */
public class KustoSinkCommon<IN> {
    private final KustoWriteOptions writeOptions;
    protected static final Logger LOG = LoggerFactory.getLogger(KustoSinkCommon.class);
    private final transient Counter ingestSucceededCounter;
    private final transient Counter ingestFailedCounter;
    private final transient Counter ingestPartiallyFailedCounter;
    private final transient Counter recordsSent;
    private final transient IngestClient ingestClient;
    private final KustoConnectionOptions connectionOptions;
    protected final transient Supplier<Integer> aritySupplier;
    protected final transient BiFunction<IN, Integer, Object> extractFieldValueFunction;
    protected IngestionMapping ingestionMapping;
    private final ScheduledExecutorService pollResultsExecutor = Executors.newSingleThreadScheduledExecutor();
    protected volatile long lastSendTime = Instant.now(Clock.systemUTC()).toEpochMilli();
    protected volatile long ackTime = Long.MAX_VALUE;

    /* JADX INFO: Access modifiers changed from: protected */
    public KustoSinkCommon(KustoConnectionOptions kustoConnectionOptions, KustoWriteOptions kustoWriteOptions, MetricGroup metricGroup, TypeSerializer<IN> typeSerializer, TypeInformation<IN> typeInformation, String str) throws URISyntaxException {
        this.connectionOptions = kustoConnectionOptions;
        this.writeOptions = kustoWriteOptions;
        this.ingestClient = KustoClientUtil.createIngestClient((KustoConnectionOptions) Preconditions.checkNotNull(kustoConnectionOptions, "Connection options passed to ingest client cannot be null."), str);
        this.ingestSucceededCounter = metricGroup.counter("succeededIngestions");
        this.ingestFailedCounter = metricGroup.counter("failedIngestions");
        this.ingestPartiallyFailedCounter = metricGroup.counter("partialSucceededIngestions");
        this.recordsSent = metricGroup.counter("recordsSent");
        Class<?> cls = typeSerializer.createInstance().getClass();
        LOG.trace("Using sink with class type: {}", cls);
        if (Tuple.class.isAssignableFrom(cls)) {
            this.aritySupplier = () -> {
                return Integer.valueOf(((TupleSerializer) typeSerializer).getArity());
            };
            this.extractFieldValueFunction = (obj, num) -> {
                return ((Tuple) obj).getField(num.intValue());
            };
            return;
        }
        if (Row.class.isAssignableFrom(cls)) {
            this.aritySupplier = () -> {
                return Integer.valueOf(((RowSerializer) typeSerializer).getArity());
            };
            this.extractFieldValueFunction = (obj2, num2) -> {
                return ((Row) obj2).getField(num2.intValue());
            };
        } else if (Product.class.isAssignableFrom(cls)) {
            this.aritySupplier = () -> {
                return Integer.valueOf(((CaseClassSerializer) typeSerializer).getArity());
            };
            this.extractFieldValueFunction = (obj3, num3) -> {
                return ((Product) obj3).productElement(num3.intValue());
            };
        } else {
            if (!(typeInformation instanceof PojoTypeInfo)) {
                throw new IllegalArgumentException("Unsupported type: " + cls);
            }
            typeInformation.getClass();
            this.aritySupplier = typeInformation::getArity;
            this.extractFieldValueFunction = (obj4, num4) -> {
                try {
                    return ((PojoTypeInfo) typeInformation).getPojoFieldAt(num4.intValue()).getField().get(obj4);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            };
            this.ingestionMapping = getIngestionMapping(((PojoTypeInfo) typeInformation).getFieldNames());
        }
    }

    @Contract("_ -> new")
    @NotNull
    private IngestionMapping getIngestionMapping(String[] strArr) {
        ColumnMapping[] columnMappingArr = new ColumnMapping[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            columnMappingArr[i] = new ColumnMapping(strArr[i], (String) null);
            columnMappingArr[i].setOrdinal(Integer.valueOf(i));
        }
        return new IngestionMapping(columnMappingArr, IngestionMapping.IngestionMappingKind.CSV);
    }

    @Contract(pure = true)
    @NotNull
    protected Supplier<IngestionResult> performIngestSupplier(@NotNull ContainerWithSas containerWithSas, IngestionMapping ingestionMapping, @NotNull String str, UUID uuid) {
        return () -> {
            try {
                String format = String.format("%s/%s%s", containerWithSas.getEndpointWithoutSas(), str, containerWithSas.getSas());
                BlobSourceInfo blobSourceInfo = new BlobSourceInfo(format);
                blobSourceInfo.setSourceId(uuid);
                LOG.trace("Ingesting into blob: {} with source id {}", format, uuid);
                IngestionProperties ingestionProperties = new IngestionProperties(this.writeOptions.getDatabase(), this.writeOptions.getTable());
                ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
                if (!this.writeOptions.getIngestByTags().isEmpty()) {
                    ingestionProperties.setIngestByTags(this.writeOptions.getIngestByTags());
                }
                if (!this.writeOptions.getAdditionalTags().isEmpty()) {
                    ingestionProperties.setAdditionalTags(this.writeOptions.getAdditionalTags());
                }
                ingestionProperties.setIngestByTags(this.writeOptions.getIngestByTags());
                ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
                ingestionProperties.setDataFormat(IngestionProperties.DataFormat.CSV.name());
                if (StringUtils.isNotEmpty(this.writeOptions.getIngestionMappingRef())) {
                    ingestionProperties.setIngestionMapping(this.writeOptions.getIngestionMappingRef(), IngestionMapping.IngestionMappingKind.CSV);
                }
                if (ingestionMapping != null) {
                    ingestionProperties.setIngestionMapping(ingestionMapping);
                }
                ingestionProperties.setFlushImmediately(this.writeOptions.getFlushImmediately());
                this.lastSendTime = Instant.now(Clock.systemUTC()).toEpochMilli();
                LOG.trace("Setting last send time to {}", Long.valueOf(this.lastSendTime));
                return this.ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
            } catch (IngestionClientException | IngestionServiceException e) {
                String format2 = String.format("URI syntax exception polling ingestion status for sourceId: %s", uuid);
                LOG.warn(format2, e);
                throw new RuntimeException(format2, e);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x0153, code lost:
    
        com.microsoft.azure.flink.writer.internal.sink.KustoSinkCommon.LOG.error("Writing to database {} and table {} failed. SourceId {} and BlobId {} failed", new java.lang.Object[]{r9.writeOptions.getDatabase(), r9.writeOptions.getTable(), r0, r0});
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x0184, code lost:
    
        if (r0 == null) goto L33;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x0189, code lost:
    
        if (0 == 0) goto L32;
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x01a0, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x018c, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x0194, code lost:
    
        r25 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x0196, code lost:
    
        r0.addSuppressed(r25);
     */
    /* JADX WARN: Code restructure failed: missing block: B:48:0x01b4, code lost:
    
        if (r15 != 0) goto L49;
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x01bc, code lost:
    
        if (r0 == null) goto L47;
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x01c1, code lost:
    
        if (0 == 0) goto L46;
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x01d8, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x01c4, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x01cc, code lost:
    
        r21 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:0x01ce, code lost:
    
        r0.addSuppressed(r21);
     */
    /* JADX WARN: Code restructure failed: missing block: B:61:0x01e0, code lost:
    
        r9.recordsSent.inc(r15);
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x01ee, code lost:
    
        if (r0 == null) goto L71;
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x01f3, code lost:
    
        if (0 == 0) goto L56;
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x020a, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:67:0x01f6, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:69:0x01fe, code lost:
    
        r19 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:70:0x0200, code lost:
    
        r0.addSuppressed(r19);
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x021d: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:81:0x021d */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x0222: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:83:0x0222 */
    /* JADX WARN: Type inference failed for: r17v0, types: [com.microsoft.azure.flink.writer.internal.sink.BlobOutputMultiVolume] */
    /* JADX WARN: Type inference failed for: r18v0, types: [java.lang.Throwable] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean ingest(java.lang.Iterable<IN> r10) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 703
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.microsoft.azure.flink.writer.internal.sink.KustoSinkCommon.ingest(java.lang.Iterable):boolean");
    }

    private boolean uploadAndPollStatus(ContainerWithSas containerWithSas, UUID uuid, String str) {
        LOG.debug("Upload to blob successful , blob file {}.Performing ingestion", str);
        IngestionResult ingestionResult = (IngestionResult) KustoRetryUtil.getRetries(KustoRetryConfig.builder().build()).executeSupplier(performIngestSupplier(containerWithSas, this.ingestionMapping, str, uuid));
        try {
            if (!this.writeOptions.getPollForIngestionStatus()) {
                LOG.debug("Upload to blob successful , blob file {}. Not polling for status", str);
                this.ackTime = Instant.now(Clock.systemUTC()).toEpochMilli();
                return true;
            }
            String str2 = pollForCompletion(str, uuid.toString(), ingestionResult, Long.valueOf(Instant.now(Clock.systemUTC()).toEpochMilli())).get();
            this.ackTime = Instant.now(Clock.systemUTC()).toEpochMilli();
            return OperationStatus.Succeeded.name().equals(str2);
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Error while polling for completion of ingestion.", e);
            throw new RuntimeException(e);
        }
    }

    protected CompletableFuture<String> pollForCompletion(String str, String str2, IngestionResult ingestionResult, Long l) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        long epochMilli = Instant.now(Clock.systemUTC()).plus(5L, (TemporalUnit) ChronoUnit.MINUTES).toEpochMilli();
        ScheduledFuture<?> scheduleAtFixedRate = this.pollResultsExecutor.scheduleAtFixedRate(() -> {
            if (Instant.now(Clock.systemUTC()).toEpochMilli() > epochMilli) {
                LOG.warn("Time for polling end is {} and the current epoch time is {}. This operation will timeout", Instant.ofEpochMilli(epochMilli), Instant.now(Clock.systemUTC()));
                completableFuture.completeExceptionally(new TimeoutException("Polling for ingestion of source id: " + str2 + " timed out."));
            }
            try {
                LOG.debug("Ingestion Status {} for blob {}", ingestionResult.getIngestionStatusCollection().stream().map(ingestionStatus -> {
                    return ingestionStatus.getIngestionSourceId() + ":" + ingestionStatus.getStatus();
                }).collect(Collectors.joining(",")), str);
                ingestionResult.getIngestionStatusCollection().stream().filter(ingestionStatus2 -> {
                    return ingestionStatus2.getIngestionSourceId().toString().equals(str2);
                }).findFirst().ifPresent(ingestionStatus3 -> {
                    if (ingestionStatus3.status == OperationStatus.Succeeded) {
                        this.ingestSucceededCounter.inc();
                        completableFuture.complete(ingestionStatus3.status.name());
                        LOG.info("Ingestion for blob {} took {} ms for state change to Succeeded", str, Long.valueOf(Instant.now(Clock.systemUTC()).toEpochMilli() - l.longValue()));
                    } else {
                        if (ingestionStatus3.status == OperationStatus.Failed) {
                            this.ingestFailedCounter.inc();
                            String format = String.format("Ingestion failed for sourceId: %s with failure reason %s.Blob name : %s", str2, ingestionStatus3.getFailureStatus(), str);
                            LOG.error(format);
                            completableFuture.completeExceptionally(new RuntimeException(format));
                            return;
                        }
                        if (ingestionStatus3.status == OperationStatus.PartiallySucceeded) {
                            this.ingestPartiallyFailedCounter.inc();
                            LOG.warn(String.format("Ingestion partially succeeded for sourceId: %s with failure reason %s. This will result in duplicates if the error was transient and was retried.Blob name: %s.Operation took %d ms", str2, ingestionStatus3.getFailureStatus(), str, Long.valueOf(Instant.now(Clock.systemUTC()).toEpochMilli() - l.longValue())));
                            completableFuture.complete(ingestionStatus3.status.name());
                        }
                    }
                });
            } catch (URISyntaxException e) {
                String format = String.format("URI syntax exception polling ingestion status for sourceId: %s", str2);
                LOG.warn(format, e);
                completableFuture.completeExceptionally(new RuntimeException(format, e));
            }
        }, 1L, 5L, TimeUnit.SECONDS);
        completableFuture.whenComplete((str3, th) -> {
            scheduleAtFixedRate.cancel(true);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void close() {
        try {
            if (this.ingestClient != null) {
                this.ingestClient.close();
            }
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
    }
}
