package org.apache.flink.connector.kinesis.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsException;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.class */
public class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSinkWriter.class);
    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType(ResourceNotFoundException.class, th -> {
        return new KinesisStreamsException("Encountered non-recoverable exception relating to not being able to find the specified resources", th);
    });
    private static final FatalExceptionClassifier KINESIS_FATAL_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.createChain(new FatalExceptionClassifier[]{AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier(), RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier()});
    private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10;
    private static final double AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR = 0.99d;
    private final Counter numRecordsOutErrorsCounter;
    private final String streamName;
    private final String streamArn;
    private final SinkWriterMetricGroup metrics;
    private final SdkAsyncHttpClient httpClient;
    private final KinesisAsyncClient kinesisClient;
    private final boolean failOnError;

    KinesisStreamsSinkWriter(ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, boolean z, String str, String str2, Properties properties) {
        this(elementConverter, initContext, i, i2, i3, j, j2, j3, z, str, str2, properties, Collections.emptyList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisStreamsSinkWriter(ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, boolean z, String str, String str2, Properties properties, Collection<BufferedRequestState<PutRecordsRequestEntry>> collection) {
        super(elementConverter, initContext, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(i).setMaxBatchSizeInBytes(j).setMaxInFlightRequests(i2).setMaxBufferedRequests(i3).setMaxTimeInBufferMS(j2).setMaxRecordSizeInBytes(j3).setRateLimitingStrategy(buildRateLimitingStrategy(i2, i)).build(), collection);
        this.failOnError = z;
        this.streamName = str;
        this.streamArn = str2;
        this.metrics = initContext.metricGroup();
        this.numRecordsOutErrorsCounter = this.metrics.getNumRecordsOutErrorsCounter();
        this.httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
        this.kinesisClient = buildClient(properties, this.httpClient);
    }

    private KinesisAsyncClient buildClient(Properties properties, SdkAsyncHttpClient sdkAsyncHttpClient) {
        AWSGeneralUtil.validateAwsCredentials(properties);
        return (KinesisAsyncClient) AWSClientUtil.createAwsAsyncClient(properties, sdkAsyncHttpClient, KinesisAsyncClient.builder(), KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
    }

    private static RateLimitingStrategy buildRateLimitingStrategy(int i, int i2) {
        return CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(i).setInitialMaxInFlightMessages(i2).setScalingStrategy(AIMDScalingStrategy.builder(i2 * i).setIncreaseRate(10).setDecreaseFactor(AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR).build()).build();
    }

    protected void submitRequestEntries(List<PutRecordsRequestEntry> list, Consumer<List<PutRecordsRequestEntry>> consumer) {
        this.kinesisClient.putRecords((PutRecordsRequest) PutRecordsRequest.builder().records(list).streamName(this.streamName).streamARN(this.streamArn).mo888build()).whenComplete((putRecordsResponse, th) -> {
            if (th != null) {
                handleFullyFailedRequest(th, list, consumer);
            } else if (putRecordsResponse.failedRecordCount().intValue() > 0) {
                handlePartiallyFailedRequest(putRecordsResponse, list, consumer);
            } else {
                consumer.accept(Collections.emptyList());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getSizeInBytes(PutRecordsRequestEntry putRecordsRequestEntry) {
        return putRecordsRequestEntry.data().asByteArrayUnsafe().length;
    }

    private void handleFullyFailedRequest(Throwable th, List<PutRecordsRequestEntry> list, Consumer<List<PutRecordsRequestEntry>> consumer) {
        LOG.debug("KDS Sink failed to write and will retry {} entries to KDS", Integer.valueOf(list.size()), th);
        this.numRecordsOutErrorsCounter.inc(list.size());
        if (isRetryable(th)) {
            consumer.accept(list);
        }
    }

    public void close() {
        AWSGeneralUtil.closeResources(this.httpClient, this.kinesisClient);
    }

    private void handlePartiallyFailedRequest(PutRecordsResponse putRecordsResponse, List<PutRecordsRequestEntry> list, Consumer<List<PutRecordsRequestEntry>> consumer) {
        LOG.debug("KDS Sink failed to write and will retry {} entries to KDS", putRecordsResponse.failedRecordCount());
        this.numRecordsOutErrorsCounter.inc(putRecordsResponse.failedRecordCount().intValue());
        if (this.failOnError) {
            getFatalExceptionCons().accept(new KinesisStreamsException.KinesisStreamsFailFastException());
            return;
        }
        ArrayList arrayList = new ArrayList(putRecordsResponse.failedRecordCount().intValue());
        List<PutRecordsResultEntry> records = putRecordsResponse.records();
        for (int i = 0; i < records.size(); i++) {
            if (records.get(i).errorCode() != null) {
                arrayList.add(list.get(i));
            }
        }
        consumer.accept(arrayList);
    }

    private boolean isRetryable(Throwable th) {
        if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(th, getFatalExceptionCons())) {
            return false;
        }
        if (!this.failOnError) {
            return true;
        }
        getFatalExceptionCons().accept(new KinesisStreamsException.KinesisStreamsFailFastException(th));
        return false;
    }
}
