/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.common;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class AsyncBatchWriteHandlerTest {
    private static final int CONCURRENCY = 10;
    private CompletableFuture<List<Boolean>> resultsByPos = new CompletableFuture();
    private CompletableFuture<List<String>> errorsById = new CompletableFuture();

    private AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler(FluentBackoff backoff) {
        SubmitFn submitFn = (SubmitFn)Mockito.spy(new SubmitFn(() -> this.resultsByPos));
        Function<Boolean, String> errorFn = success -> success != false ? null : "REASON";
        return AsyncBatchWriteHandler.byPosition((int)10, (FluentBackoff)backoff, (AsyncBatchWriteHandler.Stats)AsyncBatchWriteHandler.Stats.NONE, (BiFunction)submitFn, errorFn);
    }

    private AsyncBatchWriteHandler<String, String> byIdHandler(FluentBackoff backoff) {
        SubmitFn submitFn = (SubmitFn)Mockito.spy(new SubmitFn(() -> this.errorsById));
        Function<String, String> errorFn = err -> "REASON";
        return AsyncBatchWriteHandler.byId((int)10, (FluentBackoff)backoff, (AsyncBatchWriteHandler.Stats)AsyncBatchWriteHandler.Stats.NONE, (BiFunction)submitFn, errorFn, Function.identity(), Function.identity());
    }

    @Test
    public void retryOnPartialSuccessByPosition() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> handler = this.byPositionHandler(FluentBackoff.DEFAULT.withMaxBackoff(Duration.millis((long)1L)));
        CompletableFuture<ImmutableList> pendingResponse1 = new CompletableFuture<ImmutableList>();
        CompletableFuture<ImmutableList> pendingResponse2 = new CompletableFuture<ImmutableList>();
        CompletableFuture<ImmutableList> pendingResponse3 = new CompletableFuture<ImmutableList>();
        this.resultsByPos = pendingResponse1;
        handler.batchWrite("destination", (List)ImmutableList.of((Object)1, (Object)2, (Object)3, (Object)4));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)1))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)1, (Object)2, (Object)3, (Object)4));
        Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(1);
        this.resultsByPos = pendingResponse2;
        pendingResponse1.complete(ImmutableList.of((Object)true, (Object)true, (Object)false, (Object)false));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)2))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)3, (Object)4));
        Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(1);
        this.resultsByPos = pendingResponse3;
        pendingResponse2.complete(ImmutableList.of((Object)true, (Object)false));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)4));
        Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(1);
        pendingResponse3.complete(ImmutableList.of((Object)true));
        this.eventually(5, () -> Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(0));
        ((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
    }

    @Test
    public void retryOnPartialSuccessById() throws Throwable {
        AsyncBatchWriteHandler<String, String> handler = this.byIdHandler(FluentBackoff.DEFAULT.withMaxBackoff(Duration.millis((long)1L)));
        CompletableFuture<ImmutableList> pendingResponse1 = new CompletableFuture<ImmutableList>();
        CompletableFuture<ImmutableList> pendingResponse2 = new CompletableFuture<ImmutableList>();
        CompletableFuture<ImmutableList> pendingResponse3 = new CompletableFuture<ImmutableList>();
        this.errorsById = pendingResponse1;
        handler.batchWrite("destination", (List)ImmutableList.of((Object)"1", (Object)"2", (Object)"3", (Object)"4"));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)1))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)"1", (Object)"2", (Object)"3", (Object)"4"));
        Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(1);
        this.errorsById = pendingResponse2;
        pendingResponse1.complete(ImmutableList.of((Object)"3", (Object)"4"));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)2))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)"3", (Object)"4"));
        Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(1);
        this.errorsById = pendingResponse3;
        pendingResponse2.complete(ImmutableList.of((Object)"4"));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)"4"));
        Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(1);
        pendingResponse3.complete(ImmutableList.of());
        this.eventually(5, () -> Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(0));
        ((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
    }

    @Test
    public void retryLimitOnPartialSuccessByPosition() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> handler = this.byPositionHandler(FluentBackoff.DEFAULT.withMaxRetries(0));
        handler.batchWrite("destination", (List)ImmutableList.of((Object)1, (Object)2, (Object)3, (Object)4));
        this.resultsByPos.complete((List<Boolean>)ImmutableList.of((Object)true, (Object)true, (Object)false, (Object)false));
        Assertions.assertThatThrownBy(() -> handler.waitForCompletion()).hasMessageContaining("Exceeded retries").hasMessageEndingWith("REASON for 2 record(s).").isInstanceOf(IOException.class);
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)1, (Object)2, (Object)3, (Object)4));
    }

    @Test
    public void retryLimitOnPartialSuccessById() throws Throwable {
        AsyncBatchWriteHandler<String, String> handler = this.byIdHandler(FluentBackoff.DEFAULT.withMaxRetries(0));
        handler.batchWrite("destination", (List)ImmutableList.of((Object)"1", (Object)"2", (Object)"3", (Object)"4"));
        this.errorsById.complete((List<String>)ImmutableList.of((Object)"3", (Object)"4"));
        Assertions.assertThatThrownBy(() -> handler.waitForCompletion()).hasMessageContaining("Exceeded retries").hasMessageEndingWith("REASON for 2 record(s).").isInstanceOf(IOException.class);
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", ImmutableList.of((Object)"1", (Object)"2", (Object)"3", (Object)"4"));
    }

    @Test
    public void propagateErrorOnPutRecords() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> handler = this.byPositionHandler(FluentBackoff.DEFAULT);
        handler.batchWrite("destination", Collections.emptyList());
        this.resultsByPos.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> handler.batchWrite("destination", Collections.emptyList())).hasMessage("Request failed");
        Assertions.assertThat((boolean)handler.hasErrored()).isTrue();
        ((BiFunction)Mockito.verify((Object)handler.submitFn)).apply("destination", Collections.emptyList());
    }

    @Test
    public void propagateErrorWhenPolling() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> handler = this.byPositionHandler(FluentBackoff.DEFAULT);
        handler.batchWrite("destination", Collections.emptyList());
        handler.checkForAsyncFailure();
        this.resultsByPos.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> handler.checkForAsyncFailure()).hasMessage("Request failed");
        Assertions.assertThat((boolean)handler.hasErrored()).isTrue();
        handler.checkForAsyncFailure();
    }

    @Test
    public void propagateErrorOnWaitForCompletion() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> handler = this.byPositionHandler(FluentBackoff.DEFAULT);
        handler.batchWrite("destination", Collections.emptyList());
        this.resultsByPos.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> handler.waitForCompletion()).hasMessage("Request failed");
    }

    @Test
    public void correctlyLimitConcurrency() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> handler = this.byPositionHandler(FluentBackoff.DEFAULT);
        Runnable task = this.repeat(11, () -> handler.batchWrite("destination", Collections.emptyList()));
        Future future = ForkJoinPool.commonPool().submit(task);
        this.eventually(5, () -> Assertions.assertThat((int)handler.requestsInProgress()).isEqualTo(10));
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)10))).apply("destination", Collections.emptyList());
        });
        Assertions.assertThat((Future)future).isNotDone();
        this.resultsByPos.complete(Collections.emptyList());
        this.eventually(5, () -> {
            CompletableFuture cfr_ignored_0 = (CompletableFuture)((BiFunction)Mockito.verify((Object)handler.submitFn, (VerificationMode)Mockito.times((int)11))).apply("destination", Collections.emptyList());
        });
        handler.waitForCompletion();
        Assertions.assertThat((Future)future).isDone();
    }

    private void eventually(int attempts, Runnable fun) {
        for (int i = 0; i < attempts - 1; ++i) {
            try {
                Thread.sleep(i * 100);
                fun.run();
                return;
            }
            catch (AssertionError | InterruptedException object) {
                continue;
            }
        }
        fun.run();
    }

    private Runnable repeat(int times, ThrowingRunnable fun) {
        return () -> {
            for (int i = 0; i < times; ++i) {
                try {
                    fun.run();
                    continue;
                }
                catch (Throwable t) {
                    throw new RuntimeException(t);
                }
            }
        };
    }

    static class SubmitFn<T, V>
    implements BiFunction<String, List<T>, CompletableFuture<List<V>>> {
        private final Supplier<CompletableFuture<List<V>>> resp;

        SubmitFn(Supplier<CompletableFuture<List<V>>> resp) {
            this.resp = resp;
        }

        @Override
        public CompletableFuture<List<V>> apply(String destination, List<T> input) {
            return this.resp.get();
        }
    }
}

