package org.apache.flink.connector.kinesis.source.proxy;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.connector.kinesis.source.util.KinesisClientProvider;
import org.apache.flink.connector.kinesis.source.util.TestUtil;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Shard;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.NotThrownAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.class */
class KinesisStreamProxyTest {
    private static final SdkHttpClient HTTP_CLIENT = ApacheHttpClient.builder().build();

    KinesisStreamProxyTest() {
    }

    @NullAndEmptySource
    @ValueSource(strings = {TestUtil.SHARD_ID})
    @ParameterizedTest
    void testListShardsSingleCall(String str) {
        List<Shard> testShards = getTestShards(0, 3);
        List<KinesisClientProvider.ListShardItem> singletonList = Collections.singletonList(KinesisClientProvider.ListShardItem.builder().validation(getListShardRequestValidation("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", str, null)).shards(testShards).nextToken(null).build());
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        testingKinesisClient.setListShardsResponses(singletonList);
        AssertionsForClassTypes.assertThat(new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT).listShards("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", str)).isEqualTo(testShards);
    }

    @Test
    void testListShardsMultipleCalls() {
        List<Shard> testShards = getTestShards(0, 3);
        List<KinesisClientProvider.ListShardItem> list = (List) Stream.of((Object[]) new KinesisClientProvider.ListShardItem[]{KinesisClientProvider.ListShardItem.builder().validation(getListShardRequestValidation("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", "shardId-000000000000", null)).shards(testShards.subList(0, 1)).nextToken("next-token-1").build(), KinesisClientProvider.ListShardItem.builder().validation(getListShardRequestValidation("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", null, "next-token-1")).shards(testShards.subList(1, 2)).nextToken("next-token-2").build(), KinesisClientProvider.ListShardItem.builder().validation(getListShardRequestValidation("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", null, "next-token-2")).shards(testShards.subList(2, 4)).nextToken(null).build()}).collect(Collectors.toList());
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        testingKinesisClient.setListShardsResponses(list);
        AssertionsForClassTypes.assertThat(new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT).listShards("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", "shardId-000000000000")).isEqualTo(testShards);
    }

    @Test
    void testGetRecordsInitialReadFromTrimHorizon() {
        StartingPosition fromStart = StartingPosition.fromStart();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).nextShardIterator("next-iterator").build();
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        testingKinesisClient.setNextShardIterator("some-shard-iterator");
        testingKinesisClient.setShardIteratorValidation(validateEqual((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardId(TestUtil.SHARD_ID).shardIteratorType(ShardIteratorType.TRIM_HORIZON).build()));
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse);
        testingKinesisClient.setGetRecordsValidation(validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("some-shard-iterator").build()));
        AssertionsForClassTypes.assertThat(new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT).getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, fromStart)).isEqualTo(getRecordsResponse);
    }

    @Test
    void testGetRecordsInitialReadFromTimestamp() {
        Instant now = Instant.now();
        StartingPosition fromTimestamp = StartingPosition.fromTimestamp(now);
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).nextShardIterator("next-iterator").build();
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        testingKinesisClient.setNextShardIterator("some-shard-iterator");
        testingKinesisClient.setShardIteratorValidation(validateEqual((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardId(TestUtil.SHARD_ID).shardIteratorType(ShardIteratorType.AT_TIMESTAMP).timestamp(now).build()));
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse);
        testingKinesisClient.setGetRecordsValidation(validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("some-shard-iterator").build()));
        AssertionsForClassTypes.assertThat(new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT).getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, fromTimestamp)).isEqualTo(getRecordsResponse);
    }

    @Test
    void testGetRecordsInitialReadFromSequenceNumber() {
        StartingPosition continueFromSequenceNumber = StartingPosition.continueFromSequenceNumber("some-sequence-number");
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).nextShardIterator("next-iterator").build();
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        testingKinesisClient.setNextShardIterator("some-shard-iterator");
        testingKinesisClient.setShardIteratorValidation(validateEqual((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardId(TestUtil.SHARD_ID).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber("some-sequence-number").build()));
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse);
        testingKinesisClient.setGetRecordsValidation(validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("some-shard-iterator").build()));
        AssertionsForClassTypes.assertThat(new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT).getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, continueFromSequenceNumber)).isEqualTo(getRecordsResponse);
    }

    @Test
    void testConsecutiveGetRecordsUsesShardIteratorFromResponse() {
        StartingPosition fromStart = StartingPosition.fromStart();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).nextShardIterator("second-shard-iterator").build();
        GetRecordsResponse getRecordsResponse2 = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).nextShardIterator("third-shard-iterator").build();
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        KinesisStreamProxy kinesisStreamProxy = new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT);
        testingKinesisClient.setNextShardIterator("first-shard-iterator");
        testingKinesisClient.setShardIteratorValidation(validateEqual((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardId(TestUtil.SHARD_ID).shardIteratorType(ShardIteratorType.TRIM_HORIZON).build()));
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse);
        testingKinesisClient.setGetRecordsValidation(validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("first-shard-iterator").build()));
        AssertionsForClassTypes.assertThat(kinesisStreamProxy.getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, fromStart)).isEqualTo(getRecordsResponse);
        testingKinesisClient.setShardIteratorValidation(getShardIteratorRequest -> {
            throw new RuntimeException("Call to GetShardIterator not expected on subsequent get records call");
        });
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse2);
        testingKinesisClient.setGetRecordsValidation(validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("second-shard-iterator").build()));
        AssertionsForClassTypes.assertThat(kinesisStreamProxy.getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, fromStart)).isEqualTo(getRecordsResponse2);
    }

    @Test
    void testGetRecordsEagerlyRetriesExpiredIterators() {
        StartingPosition fromStart = StartingPosition.fromStart();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).nextShardIterator("second-shard-iterator").build();
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        KinesisStreamProxy kinesisStreamProxy = new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        testingKinesisClient.setNextShardIterator("first-shard-iterator");
        testingKinesisClient.setShardIteratorValidation(getShardIteratorRequest -> {
        });
        testingKinesisClient.setNextShardIterator("second-shard-iterator");
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse);
        testingKinesisClient.setGetRecordsValidation(getRecordsRequest -> {
            if (atomicBoolean.get()) {
                atomicBoolean.set(false);
                throw ((ExpiredIteratorException) ExpiredIteratorException.builder().build());
            }
            validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("second-shard-iterator").build());
        });
        AssertionsForClassTypes.assertThat(kinesisStreamProxy.getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, fromStart)).isEqualTo(getRecordsResponse);
        AssertionsForClassTypes.assertThat(atomicBoolean.get()).isFalse();
    }

    @Test
    void testGetRecordsHandlesCompletedShard() {
        StartingPosition continueFromSequenceNumber = StartingPosition.continueFromSequenceNumber("some-sequence-number");
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().build()}).build();
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        testingKinesisClient.setNextShardIterator("some-shard-iterator");
        testingKinesisClient.setShardIteratorValidation(validateEqual((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardId(TestUtil.SHARD_ID).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber("some-sequence-number").build()));
        testingKinesisClient.setGetRecordsResponse(getRecordsResponse);
        testingKinesisClient.setGetRecordsValidation(validateEqual((GetRecordsRequest) GetRecordsRequest.builder().streamARN("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0").shardIterator("some-shard-iterator").build()));
        KinesisStreamProxy kinesisStreamProxy = new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT);
        AssertionsForClassTypes.assertThatNoException().isThrownBy(() -> {
            kinesisStreamProxy.getRecords("arn:aws:kinesis:us-east-1:123456789012:stream/LoadTestBeta_Input_0", TestUtil.SHARD_ID, continueFromSequenceNumber);
        });
    }

    @Test
    void testCloseClosesKinesisClient() {
        KinesisClientProvider.TestingKinesisClient testingKinesisClient = new KinesisClientProvider.TestingKinesisClient();
        KinesisStreamProxy kinesisStreamProxy = new KinesisStreamProxy(testingKinesisClient, HTTP_CLIENT);
        NotThrownAssert assertThatNoException = AssertionsForClassTypes.assertThatNoException();
        Objects.requireNonNull(kinesisStreamProxy);
        assertThatNoException.isThrownBy(kinesisStreamProxy::close);
        AssertionsForClassTypes.assertThat(testingKinesisClient.isClosed()).isTrue();
    }

    private List<Shard> getTestShards(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = i; i3 <= i2; i3++) {
            arrayList.add((Shard) Shard.builder().shardId(TestUtil.generateShardId(i3)).build());
        }
        return arrayList;
    }

    private Consumer<ListShardsRequest> getListShardRequestValidation(String str, String str2, String str3) {
        return listShardsRequest -> {
            AssertionsForClassTypes.assertThat(listShardsRequest).isEqualTo((ListShardsRequest) ListShardsRequest.builder().streamARN(str).exclusiveStartShardId(str2).nextToken(str3).build());
        };
    }

    private <R extends KinesisRequest> Consumer<R> validateEqual(R r) {
        return kinesisRequest -> {
            AssertionsForClassTypes.assertThat(kinesisRequest).isEqualTo(r);
        };
    }
}
