package org.apache.beam.sdk.io.aws2.common;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler.class */
public abstract class AsyncBatchWriteHandler<RecT, ResT> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchWriteHandler.class);
    private final FluentBackoff backoff;
    private final int concurrentRequests;
    private final Stats stats;
    protected final BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> submitFn;
    protected final Function<ResT, String> errorCodeFn;
    private AtomicBoolean hasErrored = new AtomicBoolean(false);
    private AtomicReference<Throwable> asyncFailure = new AtomicReference<>();
    private Semaphore requestPermits;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler$RetryHandler.class */
    public class RetryHandler implements BiConsumer<List<ResT>, Throwable> {
        private final String destination;
        private final int totalRecords;
        private final BackOff backoff;
        private final long handlerStartTime = DateTimeUtils.currentTimeMillis();
        private long requestStartTime = 0;
        private int requests = 0;
        private List<RecT> records;

        RetryHandler(String str, List<RecT> list) {
            this.destination = str;
            this.totalRecords = list.size();
            this.records = list;
            this.backoff = AsyncBatchWriteHandler.this.backoff.backoff();
        }

        void run() {
            if (AsyncBatchWriteHandler.this.hasErrored.get()) {
                return;
            }
            try {
                this.requests++;
                this.requestStartTime = DateTimeUtils.currentTimeMillis();
                AsyncBatchWriteHandler.this.submitFn.apply(this.destination, this.records).whenComplete((BiConsumer<? super List<ResT>, ? super Throwable>) this);
            } catch (Throwable th) {
                setAsyncFailure(th);
            }
        }

        @Override // java.util.function.BiConsumer
        public void accept(List<ResT> list, Throwable th) {
            try {
                long currentTimeMillis = DateTimeUtils.currentTimeMillis();
                long j = currentTimeMillis - this.requestStartTime;
                synchronized (AsyncBatchWriteHandler.this.stats) {
                    AsyncBatchWriteHandler.this.stats.addBatchWriteRequest(j, this.requests > 1);
                }
                if (list != null && !AsyncBatchWriteHandler.this.hasErrored.get()) {
                    if (AsyncBatchWriteHandler.this.hasFailedRecords(list)) {
                        try {
                            if (BackOffUtils.next(Sleeper.DEFAULT, this.backoff)) {
                                AsyncBatchWriteHandler.LOG.info(summarizeErrors("Attempting retry", list));
                                this.records = AsyncBatchWriteHandler.this.failedRecords(this.records, list);
                                run();
                            } else {
                                th = new IOException(summarizeErrors("Exceeded retries", list));
                            }
                        } catch (Throwable th2) {
                            th = new IOException(summarizeErrors("Aborted retries", list), th2);
                        }
                    } else {
                        AsyncBatchWriteHandler.this.requestPermits.release();
                        AsyncBatchWriteHandler.LOG.debug("Done writing {} records [{} ms, {} request(s)]", new Object[]{Integer.valueOf(this.totalRecords), Long.valueOf(currentTimeMillis - this.handlerStartTime), Integer.valueOf(this.requests)});
                    }
                }
            } catch (Throwable th3) {
                th = th3;
            }
            if (th != null) {
                setAsyncFailure(th);
            }
        }

        private void setAsyncFailure(Throwable th) {
            AsyncBatchWriteHandler.LOG.warn("Error when writing batch.", th);
            AsyncBatchWriteHandler.this.hasErrored.set(true);
            AsyncBatchWriteHandler.this.asyncFailure.updateAndGet(th2 -> {
                if (th2 != null) {
                    th.addSuppressed(th2);
                }
                return th;
            });
            AsyncBatchWriteHandler.this.requestPermits.release(AsyncBatchWriteHandler.this.concurrentRequests);
        }

        private String summarizeErrors(String str, List<ResT> list) {
            return (String) ((Map) list.stream().map(AsyncBatchWriteHandler.this.errorCodeFn).filter(Predicates.notNull()).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))).entrySet().stream().map(entry -> {
                return String.format("code %s for %d record(s)", entry.getKey(), entry.getValue());
            }).collect(Collectors.joining(", ", str + " after partial failure: ", "."));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler$Stats.class */
    public interface Stats {
        public static final Stats NONE = new Stats() { // from class: org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler.Stats.1
        };

        default void addBatchWriteRequest(long j, boolean z) {
        }
    }

    protected AsyncBatchWriteHandler(int i, FluentBackoff fluentBackoff, Stats stats, Function<ResT, String> function, BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> biFunction) {
        this.backoff = fluentBackoff;
        this.concurrentRequests = i;
        this.errorCodeFn = function;
        this.submitFn = biFunction;
        this.requestPermits = new Semaphore(this.concurrentRequests);
        this.stats = stats;
    }

    public final int requestsInProgress() {
        return this.concurrentRequests - this.requestPermits.availablePermits();
    }

    public final void reset() {
        this.hasErrored = new AtomicBoolean(false);
        this.asyncFailure = new AtomicReference<>();
        this.requestPermits = new Semaphore(this.concurrentRequests);
    }

    public final boolean hasErrored() {
        return this.hasErrored.get();
    }

    public final void checkForAsyncFailure() throws Throwable {
        Throwable andSet = this.asyncFailure.getAndSet(null);
        if (andSet != null) {
            throw andSet;
        }
    }

    public final void waitForCompletion() throws Throwable {
        this.requestPermits.acquireUninterruptibly(this.concurrentRequests);
        checkForAsyncFailure();
    }

    public final void batchWrite(String str, List<RecT> list) throws Throwable {
        batchWrite(str, list, true);
    }

    public final void batchWrite(String str, List<RecT> list, boolean z) throws Throwable {
        if (!hasErrored()) {
            this.requestPermits.acquireUninterruptibly();
            new RetryHandler(str, list).run();
        }
        if (z) {
            checkForAsyncFailure();
        }
    }

    protected abstract List<RecT> failedRecords(List<RecT> list, List<ResT> list2);

    protected abstract boolean hasFailedRecords(List<ResT> list);

    public static <RecT, ResT> AsyncBatchWriteHandler<RecT, ResT> byPosition(int i, int i2, @Nullable RetryConfiguration retryConfiguration, Stats stats, BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> biFunction, Function<ResT, String> function) {
        return byPosition(i, retryBackoff(i2, retryConfiguration), stats, biFunction, function);
    }

    public static <RecT, ResT> AsyncBatchWriteHandler<RecT, ResT> byPosition(int i, FluentBackoff fluentBackoff, Stats stats, BiFunction<String, List<RecT>, CompletableFuture<List<ResT>>> biFunction, Function<ResT, String> function) {
        return new AsyncBatchWriteHandler<RecT, ResT>(i, fluentBackoff, stats, function, biFunction) { // from class: org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler.1
            @Override // org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler
            protected boolean hasFailedRecords(List<ResT> list) {
                for (int i2 = 0; i2 < list.size(); i2++) {
                    if (this.errorCodeFn.apply(list.get(i2)) != null) {
                        return true;
                    }
                }
                return false;
            }

            @Override // org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler
            protected List<RecT> failedRecords(List<RecT> list, List<ResT> list2) {
                int min = Math.min(list.size(), list2.size());
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < min; i2++) {
                    if (this.errorCodeFn.apply(list2.get(i2)) != null) {
                        arrayList.add(list.get(i2));
                    }
                }
                return arrayList;
            }
        };
    }

    public static <RecT, ErrT> AsyncBatchWriteHandler<RecT, ErrT> byId(int i, int i2, @Nullable RetryConfiguration retryConfiguration, Stats stats, BiFunction<String, List<RecT>, CompletableFuture<List<ErrT>>> biFunction, Function<ErrT, String> function, Function<RecT, String> function2, Function<ErrT, String> function3) {
        return byId(i, retryBackoff(i2, retryConfiguration), stats, biFunction, function, function2, function3);
    }

    public static <RecT, ErrT> AsyncBatchWriteHandler<RecT, ErrT> byId(int i, FluentBackoff fluentBackoff, Stats stats, BiFunction<String, List<RecT>, CompletableFuture<List<ErrT>>> biFunction, Function<ErrT, String> function, final Function<RecT, String> function2, final Function<ErrT, String> function3) {
        return new AsyncBatchWriteHandler<RecT, ErrT>(i, fluentBackoff, stats, function, biFunction) { // from class: org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler.2
            @Override // org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler
            protected boolean hasFailedRecords(List<ErrT> list) {
                return !list.isEmpty();
            }

            @Override // org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler
            protected List<RecT> failedRecords(List<RecT> list, List<ErrT> list2) {
                HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(list2.size());
                Function function4 = function3;
                list2.forEach(obj -> {
                    newHashSetWithExpectedSize.add((String) function4.apply(obj));
                });
                ArrayList arrayList = new ArrayList(list2.size());
                for (int i2 = 0; i2 < list.size(); i2++) {
                    RecT rect = list.get(i2);
                    if (newHashSetWithExpectedSize.contains(function2.apply(rect))) {
                        arrayList.add(rect);
                        if (arrayList.size() == list2.size()) {
                            return arrayList;
                        }
                    }
                }
                return arrayList;
            }
        };
    }

    private static FluentBackoff retryBackoff(int i, @Nullable RetryConfiguration retryConfiguration) {
        FluentBackoff withMaxRetries = FluentBackoff.DEFAULT.withMaxRetries(i);
        if (retryConfiguration != null) {
            if (retryConfiguration.throttledBaseBackoff() != null) {
                withMaxRetries = withMaxRetries.withInitialBackoff(retryConfiguration.throttledBaseBackoff());
            }
            if (retryConfiguration.maxBackoff() != null) {
                withMaxRetries = withMaxRetries.withMaxBackoff(retryConfiguration.maxBackoff());
            }
        }
        return withMaxRetries;
    }
}
