package org.apache.beam.sdk.io.aws2.common;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandlerTest.class */
public class AsyncBatchWriteHandlerTest {
    private static final int CONCURRENCY = 10;
    private CompletableFuture<List<Boolean>> resultsByPos = new CompletableFuture<>();
    private CompletableFuture<List<String>> errorsById = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandlerTest$SubmitFn.class */
    public static class SubmitFn<T, V> implements BiFunction<String, List<T>, CompletableFuture<List<V>>> {
        private final Supplier<CompletableFuture<List<V>>> resp;

        SubmitFn(Supplier<CompletableFuture<List<V>>> supplier) {
            this.resp = supplier;
        }

        @Override // java.util.function.BiFunction
        public CompletableFuture<List<V>> apply(String str, List<T> list) {
            return this.resp.get();
        }
    }

    private AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler(FluentBackoff fluentBackoff) {
        return AsyncBatchWriteHandler.byPosition(CONCURRENCY, fluentBackoff, AsyncBatchWriteHandler.Stats.NONE, (SubmitFn) Mockito.spy(new SubmitFn(() -> {
            return this.resultsByPos;
        })), bool -> {
            if (bool.booleanValue()) {
                return null;
            }
            return "REASON";
        });
    }

    private AsyncBatchWriteHandler<String, String> byIdHandler(FluentBackoff fluentBackoff) {
        return AsyncBatchWriteHandler.byId(CONCURRENCY, fluentBackoff, AsyncBatchWriteHandler.Stats.NONE, (SubmitFn) Mockito.spy(new SubmitFn(() -> {
            return this.errorsById;
        })), str -> {
            return "REASON";
        }, Function.identity(), Function.identity());
    }

    @Test
    public void retryOnPartialSuccessByPosition() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler = byPositionHandler(FluentBackoff.DEFAULT.withMaxBackoff(Duration.millis(1L)));
        CompletableFuture<List<Boolean>> completableFuture = new CompletableFuture<>();
        CompletableFuture<List<Boolean>> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<List<Boolean>> completableFuture3 = new CompletableFuture<>();
        this.resultsByPos = completableFuture;
        byPositionHandler.batchWrite("destination", ImmutableList.of(1, 2, 3, 4));
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byPositionHandler.submitFn, Mockito.times(1))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction) Mockito.verify(byPositionHandler.submitFn)).apply("destination", ImmutableList.of(1, 2, 3, 4));
        Assertions.assertThat(byPositionHandler.requestsInProgress()).isEqualTo(1);
        this.resultsByPos = completableFuture2;
        completableFuture.complete(ImmutableList.of(true, true, false, false));
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byPositionHandler.submitFn, Mockito.times(2))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction) Mockito.verify(byPositionHandler.submitFn)).apply("destination", ImmutableList.of(3, 4));
        Assertions.assertThat(byPositionHandler.requestsInProgress()).isEqualTo(1);
        this.resultsByPos = completableFuture3;
        completableFuture2.complete(ImmutableList.of(true, false));
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byPositionHandler.submitFn, Mockito.times(3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction) Mockito.verify(byPositionHandler.submitFn)).apply("destination", ImmutableList.of(4));
        Assertions.assertThat(byPositionHandler.requestsInProgress()).isEqualTo(1);
        completableFuture3.complete(ImmutableList.of(true));
        eventually(5, () -> {
            Assertions.assertThat(byPositionHandler.requestsInProgress()).isEqualTo(0);
        });
        ((BiFunction) Mockito.verify(byPositionHandler.submitFn, Mockito.times(3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
    }

    @Test
    public void retryOnPartialSuccessById() throws Throwable {
        AsyncBatchWriteHandler<String, String> byIdHandler = byIdHandler(FluentBackoff.DEFAULT.withMaxBackoff(Duration.millis(1L)));
        CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
        CompletableFuture<List<String>> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<List<String>> completableFuture3 = new CompletableFuture<>();
        this.errorsById = completableFuture;
        byIdHandler.batchWrite("destination", ImmutableList.of("1", "2", "3", "4"));
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byIdHandler.submitFn, Mockito.times(1))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction) Mockito.verify(byIdHandler.submitFn)).apply("destination", ImmutableList.of("1", "2", "3", "4"));
        Assertions.assertThat(byIdHandler.requestsInProgress()).isEqualTo(1);
        this.errorsById = completableFuture2;
        completableFuture.complete(ImmutableList.of("3", "4"));
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byIdHandler.submitFn, Mockito.times(2))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction) Mockito.verify(byIdHandler.submitFn)).apply("destination", ImmutableList.of("3", "4"));
        Assertions.assertThat(byIdHandler.requestsInProgress()).isEqualTo(1);
        this.errorsById = completableFuture3;
        completableFuture2.complete(ImmutableList.of("4"));
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byIdHandler.submitFn, Mockito.times(3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
        });
        ((BiFunction) Mockito.verify(byIdHandler.submitFn)).apply("destination", ImmutableList.of("4"));
        Assertions.assertThat(byIdHandler.requestsInProgress()).isEqualTo(1);
        completableFuture3.complete(ImmutableList.of());
        eventually(5, () -> {
            Assertions.assertThat(byIdHandler.requestsInProgress()).isEqualTo(0);
        });
        ((BiFunction) Mockito.verify(byIdHandler.submitFn, Mockito.times(3))).apply(ArgumentMatchers.anyString(), ArgumentMatchers.anyList());
    }

    @Test
    public void retryLimitOnPartialSuccessByPosition() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler = byPositionHandler(FluentBackoff.DEFAULT.withMaxRetries(0));
        byPositionHandler.batchWrite("destination", ImmutableList.of(1, 2, 3, 4));
        this.resultsByPos.complete(ImmutableList.of(true, true, false, false));
        Assertions.assertThatThrownBy(() -> {
            byPositionHandler.waitForCompletion();
        }).hasMessageContaining("Exceeded retries").hasMessageEndingWith("REASON for 2 record(s).").isInstanceOf(IOException.class);
        ((BiFunction) Mockito.verify(byPositionHandler.submitFn)).apply("destination", ImmutableList.of(1, 2, 3, 4));
    }

    @Test
    public void retryLimitOnPartialSuccessById() throws Throwable {
        AsyncBatchWriteHandler<String, String> byIdHandler = byIdHandler(FluentBackoff.DEFAULT.withMaxRetries(0));
        byIdHandler.batchWrite("destination", ImmutableList.of("1", "2", "3", "4"));
        this.errorsById.complete(ImmutableList.of("3", "4"));
        Assertions.assertThatThrownBy(() -> {
            byIdHandler.waitForCompletion();
        }).hasMessageContaining("Exceeded retries").hasMessageEndingWith("REASON for 2 record(s).").isInstanceOf(IOException.class);
        ((BiFunction) Mockito.verify(byIdHandler.submitFn)).apply("destination", ImmutableList.of("1", "2", "3", "4"));
    }

    @Test
    public void propagateErrorOnPutRecords() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler = byPositionHandler(FluentBackoff.DEFAULT);
        byPositionHandler.batchWrite("destination", Collections.emptyList());
        this.resultsByPos.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> {
            byPositionHandler.batchWrite("destination", Collections.emptyList());
        }).hasMessage("Request failed");
        Assertions.assertThat(byPositionHandler.hasErrored()).isTrue();
        ((BiFunction) Mockito.verify(byPositionHandler.submitFn)).apply("destination", Collections.emptyList());
    }

    @Test
    public void propagateErrorWhenPolling() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler = byPositionHandler(FluentBackoff.DEFAULT);
        byPositionHandler.batchWrite("destination", Collections.emptyList());
        byPositionHandler.checkForAsyncFailure();
        this.resultsByPos.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> {
            byPositionHandler.checkForAsyncFailure();
        }).hasMessage("Request failed");
        Assertions.assertThat(byPositionHandler.hasErrored()).isTrue();
        byPositionHandler.checkForAsyncFailure();
    }

    @Test
    public void propagateErrorOnWaitForCompletion() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler = byPositionHandler(FluentBackoff.DEFAULT);
        byPositionHandler.batchWrite("destination", Collections.emptyList());
        this.resultsByPos.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> {
            byPositionHandler.waitForCompletion();
        }).hasMessage("Request failed");
    }

    @Test
    public void correctlyLimitConcurrency() throws Throwable {
        AsyncBatchWriteHandler<Integer, Boolean> byPositionHandler = byPositionHandler(FluentBackoff.DEFAULT);
        ForkJoinTask<?> submit = ForkJoinPool.commonPool().submit(repeat(11, () -> {
            byPositionHandler.batchWrite("destination", Collections.emptyList());
        }));
        eventually(5, () -> {
            Assertions.assertThat(byPositionHandler.requestsInProgress()).isEqualTo(CONCURRENCY);
        });
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byPositionHandler.submitFn, Mockito.times(CONCURRENCY))).apply("destination", Collections.emptyList());
        });
        Assertions.assertThat(submit).isNotDone();
        this.resultsByPos.complete(Collections.emptyList());
        eventually(5, () -> {
            ((BiFunction) Mockito.verify(byPositionHandler.submitFn, Mockito.times(11))).apply("destination", Collections.emptyList());
        });
        byPositionHandler.waitForCompletion();
        Assertions.assertThat(submit).isDone();
    }

    private void eventually(int i, Runnable runnable) {
        for (int i2 = 0; i2 < i - 1; i2++) {
            try {
                Thread.sleep(i2 * 100);
                runnable.run();
                return;
            } catch (AssertionError | InterruptedException e) {
            }
        }
        runnable.run();
    }

    private Runnable repeat(int i, ThrowingRunnable throwingRunnable) {
        return () -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    throwingRunnable.run();
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }
        };
    }
}
