package org.apache.flink.connector.firehose.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.firehose.sink.KinesisFirehoseException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.class */
public class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class);
    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType(ResourceNotFoundException.class, th -> {
        return new KinesisFirehoseException("Encountered non-recoverable exception relating to not being able to find the specified resources", th);
    });
    private static final FatalExceptionClassifier FIREHOSE_FATAL_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.createChain(new FatalExceptionClassifier[]{AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier(), RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier()});
    private final Counter numRecordsOutErrorsCounter;
    private final String deliveryStreamName;
    private final SinkWriterMetricGroup metrics;
    private final SdkAsyncHttpClient httpClient;
    private final FirehoseAsyncClient firehoseClient;
    private final boolean failOnError;

    private static SdkAsyncHttpClient createHttpClient(Properties properties) {
        return AWSGeneralUtil.createAsyncHttpClient(properties);
    }

    private static FirehoseAsyncClient createFirehoseClient(Properties properties, SdkAsyncHttpClient sdkAsyncHttpClient) {
        AWSGeneralUtil.validateAwsCredentials(properties);
        return AWSAsyncSinkUtil.createAwsAsyncClient(properties, sdkAsyncHttpClient, FirehoseAsyncClient.builder(), KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT, KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
    }

    KinesisFirehoseSinkWriter(ElementConverter<InputT, Record> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, boolean z, String str, Properties properties) {
        this(elementConverter, initContext, i, i2, i3, j, j2, j3, z, str, properties, Collections.emptyList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisFirehoseSinkWriter(ElementConverter<InputT, Record> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, boolean z, String str, Properties properties, Collection<BufferedRequestState<Record>> collection) {
        super(elementConverter, initContext, i, i2, i3, j, j2, j3, collection);
        this.failOnError = z;
        this.deliveryStreamName = str;
        this.metrics = initContext.metricGroup();
        this.numRecordsOutErrorsCounter = this.metrics.getNumRecordsOutErrorsCounter();
        this.httpClient = createHttpClient(properties);
        this.firehoseClient = createFirehoseClient(properties, this.httpClient);
    }

    protected void submitRequestEntries(List<Record> list, Consumer<List<Record>> consumer) {
        this.firehoseClient.putRecordBatch((PutRecordBatchRequest) PutRecordBatchRequest.builder().records(list).deliveryStreamName(this.deliveryStreamName).build()).whenComplete((putRecordBatchResponse, th) -> {
            if (th != null) {
                handleFullyFailedRequest(th, list, consumer);
            } else if (putRecordBatchResponse.failedPutCount().intValue() > 0) {
                handlePartiallyFailedRequest(putRecordBatchResponse, list, consumer);
            } else {
                consumer.accept(Collections.emptyList());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getSizeInBytes(Record record) {
        return record.data().asByteArrayUnsafe().length;
    }

    public void close() {
        AWSGeneralUtil.closeResources(new SdkAutoCloseable[]{this.httpClient, this.firehoseClient});
    }

    private void handleFullyFailedRequest(Throwable th, List<Record> list, Consumer<List<Record>> consumer) {
        LOG.debug("KDF Sink failed to write and will retry {} entries to KDF first request was {}", new Object[]{Integer.valueOf(list.size()), list.get(0).toString(), th});
        this.numRecordsOutErrorsCounter.inc(list.size());
        if (isRetryable(th)) {
            consumer.accept(list);
        }
    }

    private void handlePartiallyFailedRequest(PutRecordBatchResponse putRecordBatchResponse, List<Record> list, Consumer<List<Record>> consumer) {
        LOG.debug("KDF Sink failed to write and will retry {} entries to KDF first request was {}", Integer.valueOf(list.size()), list.get(0).toString());
        this.numRecordsOutErrorsCounter.inc(putRecordBatchResponse.failedPutCount().intValue());
        if (this.failOnError) {
            getFatalExceptionCons().accept(new KinesisFirehoseException.KinesisFirehoseFailFastException());
            return;
        }
        ArrayList arrayList = new ArrayList(putRecordBatchResponse.failedPutCount().intValue());
        List requestResponses = putRecordBatchResponse.requestResponses();
        for (int i = 0; i < requestResponses.size(); i++) {
            if (((PutRecordBatchResponseEntry) requestResponses.get(i)).errorCode() != null) {
                arrayList.add(list.get(i));
            }
        }
        consumer.accept(arrayList);
    }

    private boolean isRetryable(Throwable th) {
        if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(th, getFatalExceptionCons())) {
            return false;
        }
        if (!this.failOnError) {
            return true;
        }
        getFatalExceptionCons().accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(th));
        return false;
    }
}
