package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.aws2.kinesis.AsyncPutRecordsHandler;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.util.BackOff;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/AsyncPutRecordsHandlerTest.class */
public class AsyncPutRecordsHandlerTest extends PutRecordsHelpers {
    private static final String STREAM = "streamName";
    private static final int CONCURRENCY = 10;
    private CompletableFuture<PutRecordsResponse> pendingResponse = new CompletableFuture<>();

    @Mock
    private KinesisAsyncClient client;

    @Mock
    private Supplier<BackOff> backoff;
    private AsyncPutRecordsHandler handler;

    @Before
    public void init() {
        this.handler = new AsyncPutRecordsHandler(this.client, CONCURRENCY, this.backoff, (AsyncPutRecordsHandler.Stats) Mockito.mock(AsyncPutRecordsHandler.Stats.class));
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(this.pendingResponse);
    }

    @Test
    public void retryOnPartialSuccess() throws Throwable {
        Mockito.when(this.backoff.get()).thenReturn(BackOff.ZERO_BACKOFF);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(this.pendingResponse, new CompletableFuture[]{completableFuture, completableFuture2});
        List<PutRecordsRequestEntry> fromTestRows = fromTestRows(TestRow.getExpectedValues(0, 100));
        this.handler.putRecords(STREAM, fromTestRows);
        eventually(5, () -> {
            ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(1))).putRecords(anyRequest());
        });
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords(request(fromTestRows));
        Assertions.assertThat(this.handler.pendingRequests()).isEqualTo(1);
        this.pendingResponse.complete(partialSuccessResponse(50, 50));
        eventually(5, () -> {
            ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(2))).putRecords(anyRequest());
        });
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords(request(fromTestRows.subList(50, 100)));
        Assertions.assertThat(this.handler.pendingRequests()).isEqualTo(1);
        completableFuture.complete(partialSuccessResponse(25, 25));
        eventually(5, () -> {
            ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(3))).putRecords(anyRequest());
        });
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords(request(fromTestRows.subList(75, 100)));
        Assertions.assertThat(this.handler.pendingRequests()).isEqualTo(1);
        completableFuture2.complete((PutRecordsResponse) PutRecordsResponse.builder().build());
        Mockito.verifyNoMoreInteractions(new Object[]{this.client});
        eventually(5, () -> {
            Assertions.assertThat(this.handler.pendingRequests()).isEqualTo(0);
        });
    }

    @Test
    public void retryLimitOnPartialSuccess() throws Throwable {
        Mockito.when(this.backoff.get()).thenReturn(BackOff.STOP_BACKOFF);
        this.handler.putRecords(STREAM, fromTestRows(TestRow.getExpectedValues(0, 100)));
        this.pendingResponse.complete(partialSuccessResponse(98, 2));
        Assertions.assertThatThrownBy(() -> {
            this.handler.waitForCompletion();
        }).hasMessageContaining("Exceeded retries").hasMessageEndingWith("ProvisionedThroughputExceededException for 2 record(s).").isInstanceOf(IOException.class);
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords(anyRequest());
    }

    @Test
    public void propagateErrorOnPutRecords() throws Throwable {
        this.handler.putRecords(STREAM, Collections.emptyList());
        this.pendingResponse.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> {
            this.handler.putRecords(STREAM, Collections.emptyList());
        }).hasMessage("Request failed");
        Assertions.assertThat(this.handler.hasErrored()).isTrue();
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords(anyRequest());
    }

    @Test
    public void propagateErrorWhenPolling() throws Throwable {
        this.handler.putRecords(STREAM, Collections.emptyList());
        this.handler.checkForAsyncFailure();
        this.pendingResponse.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> {
            this.handler.checkForAsyncFailure();
        }).hasMessage("Request failed");
        Assertions.assertThat(this.handler.hasErrored()).isTrue();
        this.handler.checkForAsyncFailure();
    }

    @Test
    public void propagateErrorOnWaitForCompletion() throws Throwable {
        this.handler.putRecords(STREAM, Collections.emptyList());
        this.pendingResponse.completeExceptionally(new RuntimeException("Request failed"));
        Assertions.assertThatThrownBy(() -> {
            this.handler.waitForCompletion();
        }).hasMessage("Request failed");
    }

    @Test
    public void correctlyLimitConcurrency() throws Throwable {
        ForkJoinTask<?> submit = ForkJoinPool.commonPool().submit(repeat(11, () -> {
            this.handler.putRecords(STREAM, Collections.emptyList());
        }));
        eventually(5, () -> {
            Assertions.assertThat(this.handler.pendingRequests()).isEqualTo(CONCURRENCY);
        });
        eventually(5, () -> {
            ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(CONCURRENCY))).putRecords(anyRequest());
        });
        Assertions.assertThat(submit).isNotDone();
        this.pendingResponse.complete((PutRecordsResponse) PutRecordsResponse.builder().build());
        eventually(5, () -> {
            ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(11))).putRecords(anyRequest());
        });
        this.handler.waitForCompletion();
        Assertions.assertThat(submit).isDone();
    }

    private PutRecordsRequest request(List<PutRecordsRequestEntry> list) {
        return (PutRecordsRequest) PutRecordsRequest.builder().streamName(STREAM).records(list).build();
    }

    private void eventually(int i, Runnable runnable) {
        for (int i2 = 0; i2 < i - 1; i2++) {
            try {
                Thread.sleep(i2 * 100);
                runnable.run();
                return;
            } catch (AssertionError | InterruptedException e) {
            }
        }
        runnable.run();
    }

    private Runnable repeat(int i, ThrowingRunnable throwingRunnable) {
        return () -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    throwingRunnable.run();
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }
        };
    }
}
