package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.joda.time.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/AsyncPutRecordsHandler.class */
public class AsyncPutRecordsHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncPutRecordsHandler.class);
    private final KinesisAsyncClient kinesis;
    private final Supplier<BackOff> backoff;
    private final int concurrentRequests;
    private final Stats stats;
    private AtomicBoolean hasErrored;
    private AtomicReference<Throwable> asyncFailure;
    private Semaphore pendingRequests;

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/AsyncPutRecordsHandler$RetryHandler.class */
    private class RetryHandler implements BiConsumer<PutRecordsResponse, Throwable> {
        private final int totalRecords;
        private final String stream;
        private final BackOff backoff;
        private final long handlerStartTime = DateTimeUtils.currentTimeMillis();
        private long requestStartTime = 0;
        private int requests = 0;
        private List<PutRecordsRequestEntry> records;

        RetryHandler(String str, List<PutRecordsRequestEntry> list) {
            this.stream = str;
            this.totalRecords = list.size();
            this.records = list;
            this.backoff = (BackOff) AsyncPutRecordsHandler.this.backoff.get();
        }

        void run() {
            if (AsyncPutRecordsHandler.this.hasErrored.get()) {
                return;
            }
            try {
                this.requests++;
                this.requestStartTime = DateTimeUtils.currentTimeMillis();
                AsyncPutRecordsHandler.this.kinesis.putRecords((PutRecordsRequest) PutRecordsRequest.builder().streamName(this.stream).records(this.records).build()).whenComplete((BiConsumer) this);
            } catch (Throwable th) {
                setAsyncFailure(th);
            }
        }

        @Override // java.util.function.BiConsumer
        public void accept(PutRecordsResponse putRecordsResponse, Throwable th) {
            try {
                long currentTimeMillis = DateTimeUtils.currentTimeMillis();
                long j = currentTimeMillis - this.requestStartTime;
                synchronized (AsyncPutRecordsHandler.this.stats) {
                    AsyncPutRecordsHandler.this.stats.addPutRecordsRequest(j, this.requests > 1);
                }
                if (putRecordsResponse != null && !AsyncPutRecordsHandler.this.hasErrored.get()) {
                    if (hasErrors(putRecordsResponse)) {
                        try {
                            if (BackOffUtils.next(Sleeper.DEFAULT, this.backoff)) {
                                AsyncPutRecordsHandler.LOG.info(summarizeErrors("Attempting retry", putRecordsResponse));
                                this.records = failedRecords(putRecordsResponse);
                                run();
                            } else {
                                th = new IOException(summarizeErrors("Exceeded retries", putRecordsResponse));
                            }
                        } catch (Throwable th2) {
                            th = new IOException(summarizeErrors("Aborted retries", putRecordsResponse), th2);
                        }
                    } else {
                        AsyncPutRecordsHandler.this.pendingRequests.release();
                        AsyncPutRecordsHandler.LOG.debug("Done writing {} records [{} ms, {} request(s)]", new Object[]{Integer.valueOf(this.totalRecords), Long.valueOf(currentTimeMillis - this.handlerStartTime), Integer.valueOf(this.requests)});
                    }
                }
            } catch (Throwable th3) {
                th = th3;
            }
            if (th != null) {
                setAsyncFailure(th);
            }
        }

        private void setAsyncFailure(Throwable th) {
            AsyncPutRecordsHandler.LOG.warn("Error when writing to Kinesis.", th);
            AsyncPutRecordsHandler.this.hasErrored.set(true);
            AsyncPutRecordsHandler.this.asyncFailure.updateAndGet(th2 -> {
                if (th2 != null) {
                    th.addSuppressed(th2);
                }
                return th;
            });
            AsyncPutRecordsHandler.this.pendingRequests.release(AsyncPutRecordsHandler.this.concurrentRequests);
        }

        private boolean hasErrors(PutRecordsResponse putRecordsResponse) {
            return putRecordsResponse.records().stream().anyMatch(putRecordsResultEntry -> {
                return putRecordsResultEntry.errorCode() != null;
            });
        }

        private List<PutRecordsRequestEntry> failedRecords(PutRecordsResponse putRecordsResponse) {
            return (List) Streams.zip(this.records.stream(), putRecordsResponse.records().stream(), (v0, v1) -> {
                return Pair.of(v0, v1);
            }).filter(pair -> {
                return ((PutRecordsResultEntry) pair.getRight()).errorCode() != null;
            }).map(pair2 -> {
                return (PutRecordsRequestEntry) pair2.getLeft();
            }).collect(Collectors.toList());
        }

        private String summarizeErrors(String str, PutRecordsResponse putRecordsResponse) {
            return (String) ((Map) putRecordsResponse.records().stream().filter(putRecordsResultEntry -> {
                return putRecordsResultEntry.errorCode() != null;
            }).map(putRecordsResultEntry2 -> {
                return putRecordsResultEntry2.errorCode();
            }).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))).entrySet().stream().map(entry -> {
                return String.format("%s for %d record(s)", entry.getKey(), entry.getValue());
            }).collect(Collectors.joining(", ", str + " after failure when writing to Kinesis: ", "."));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/AsyncPutRecordsHandler$Stats.class */
    public interface Stats {
        void addPutRecordsRequest(long j, boolean z);
    }

    AsyncPutRecordsHandler(KinesisAsyncClient kinesisAsyncClient, int i, Supplier<BackOff> supplier, Stats stats) {
        this.kinesis = kinesisAsyncClient;
        this.backoff = supplier;
        this.concurrentRequests = i;
        this.hasErrored = new AtomicBoolean(false);
        this.asyncFailure = new AtomicReference<>();
        this.pendingRequests = new Semaphore(this.concurrentRequests);
        this.stats = stats;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncPutRecordsHandler(KinesisAsyncClient kinesisAsyncClient, int i, FluentBackoff fluentBackoff, Stats stats) {
        this(kinesisAsyncClient, i, (Supplier<BackOff>) () -> {
            return fluentBackoff.backoff();
        }, stats);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int pendingRequests() {
        return this.concurrentRequests - this.pendingRequests.availablePermits();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reset() {
        this.hasErrored = new AtomicBoolean(false);
        this.asyncFailure = new AtomicReference<>();
        this.pendingRequests = new Semaphore(this.concurrentRequests);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasErrored() {
        return this.hasErrored.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkForAsyncFailure() throws Throwable {
        Throwable andSet = this.asyncFailure.getAndSet(null);
        if (andSet != null) {
            throw andSet;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForCompletion() throws Throwable {
        this.pendingRequests.acquireUninterruptibly(this.concurrentRequests);
        checkForAsyncFailure();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putRecords(String str, List<PutRecordsRequestEntry> list) throws Throwable {
        this.pendingRequests.acquireUninterruptibly();
        new RetryHandler(str, list).run();
        checkForAsyncFailure();
    }
}
