/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kinesis.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.SdkAutoCloseable;

class KinesisStreamsSinkWriter<InputT>
extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSinkWriter.class);
    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType(ResourceNotFoundException.class, err -> new KinesisStreamsException("Encountered non-recoverable exception relating to not being able to find the specified resources", (Throwable)err));
    private static final FatalExceptionClassifier KINESIS_FATAL_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.createChain((FatalExceptionClassifier[])new FatalExceptionClassifier[]{AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier(), RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier()});
    private final Counter numRecordsOutErrorsCounter;
    private final String streamName;
    private final SinkWriterMetricGroup metrics;
    private final SdkAsyncHttpClient httpClient;
    private final KinesisAsyncClient kinesisClient;
    private final boolean failOnError;

    KinesisStreamsSinkWriter(ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, String streamName, Properties kinesisClientProperties) {
        this(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, failOnError, streamName, kinesisClientProperties, Collections.emptyList());
    }

    KinesisStreamsSinkWriter(ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, String streamName, Properties kinesisClientProperties, Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
        super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, states);
        this.failOnError = failOnError;
        this.streamName = streamName;
        this.metrics = context.metricGroup();
        this.numRecordsOutErrorsCounter = this.metrics.getNumRecordsOutErrorsCounter();
        this.httpClient = AWSGeneralUtil.createAsyncHttpClient((Properties)kinesisClientProperties);
        this.kinesisClient = this.buildClient(kinesisClientProperties, this.httpClient);
    }

    private KinesisAsyncClient buildClient(Properties kinesisClientProperties, SdkAsyncHttpClient httpClient) {
        AWSGeneralUtil.validateAwsCredentials((Properties)kinesisClientProperties);
        return (KinesisAsyncClient)AWSAsyncSinkUtil.createAwsAsyncClient((Properties)kinesisClientProperties, (SdkAsyncHttpClient)httpClient, (AwsAsyncClientBuilder)KinesisAsyncClient.builder(), (String)"Apache Flink %s (%s) Kinesis Connector", (String)"aws.kinesis.client.user-agent-prefix");
    }

    protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, Consumer<List<PutRecordsRequestEntry>> requestResult) {
        PutRecordsRequest batchRequest = (PutRecordsRequest)PutRecordsRequest.builder().records(requestEntries).streamName(this.streamName).build();
        CompletableFuture future = this.kinesisClient.putRecords(batchRequest);
        future.whenComplete((response, err) -> {
            if (err != null) {
                this.handleFullyFailedRequest((Throwable)err, requestEntries, requestResult);
            } else if (response.failedRecordCount() > 0) {
                this.handlePartiallyFailedRequest((PutRecordsResponse)response, requestEntries, requestResult);
            } else {
                requestResult.accept(Collections.emptyList());
            }
        });
    }

    protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) {
        return requestEntry.data().asByteArrayUnsafe().length;
    }

    private void handleFullyFailedRequest(Throwable err, List<PutRecordsRequestEntry> requestEntries, Consumer<List<PutRecordsRequestEntry>> requestResult) {
        LOG.debug("KDS Sink failed to write and will retry {} entries to KDS", (Object)requestEntries.size(), (Object)err);
        this.numRecordsOutErrorsCounter.inc((long)requestEntries.size());
        if (this.isRetryable(err)) {
            requestResult.accept(requestEntries);
        }
    }

    public void close() {
        AWSGeneralUtil.closeResources((SdkAutoCloseable[])new SdkAutoCloseable[]{this.httpClient, this.kinesisClient});
    }

    private void handlePartiallyFailedRequest(PutRecordsResponse response, List<PutRecordsRequestEntry> requestEntries, Consumer<List<PutRecordsRequestEntry>> requestResult) {
        LOG.debug("KDS Sink failed to write and will retry {} entries to KDS", (Object)response.failedRecordCount());
        this.numRecordsOutErrorsCounter.inc((long)response.failedRecordCount().intValue());
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new KinesisStreamsException.KinesisStreamsFailFastException());
            return;
        }
        ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<PutRecordsRequestEntry>(response.failedRecordCount());
        List records = response.records();
        for (int i = 0; i < records.size(); ++i) {
            if (((PutRecordsResultEntry)records.get(i)).errorCode() == null) continue;
            failedRequestEntries.add(requestEntries.get(i));
        }
        requestResult.accept(failedRequestEntries);
    }

    private boolean isRetryable(Throwable err) {
        if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, this.getFatalExceptionCons())) {
            return false;
        }
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new KinesisStreamsException.KinesisStreamsFailFastException(err));
            return false;
        }
        return true;
    }
}

