package org.apache.flink.connector.kinesis.source.enumerator;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider;
import org.apache.flink.connector.kinesis.source.util.TestUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.assertj.core.api.NotThrownAssert;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.class */
class KinesisStreamsSourceEnumeratorTest {
    private static final int NUM_SUBTASKS = 1;
    private static final String STREAM_ARN = "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream";

    KinesisStreamsSourceEnumeratorTest() {
    }

    @MethodSource({"provideInitialPositions"})
    @ParameterizedTest
    void testStartWithoutStateDiscoversAndAssignsShards(KinesisStreamsSourceConfigConstants.InitialPosition initialPosition, String str, ShardIteratorType shardIteratorType) throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            Configuration configuration = new Configuration();
            configuration.set(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, initialPosition);
            configuration.set(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP, str);
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", configuration, testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
            kinesisStreamsSourceEnumerator.start();
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getOneTimeCallables()).hasSize(NUM_SUBTASKS);
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getPeriodicCallables()).hasSize(NUM_SUBTASKS);
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            kinesisStreamsSourceEnumerator.addReader(NUM_SUBTASKS);
            String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
            testStreamProxy.addShards(strArr);
            mockSplitEnumeratorContext.runNextOneTimeCallable();
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactly(strArr);
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getStartingPosition();
            })).allSatisfy(startingPosition -> {
                AssertionsForInterfaceTypes.assertThat(startingPosition.getShardIteratorType()).isEqualTo(shardIteratorType);
            });
            mockSplitEnumeratorContext.runPeriodicCallable(0);
            AssertionsForInterfaceTypes.assertThat(((SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(NUM_SUBTASKS)).assignment()).isEmpty();
            String[] strArr2 = {TestUtil.generateShardId(4), TestUtil.generateShardId(5)};
            testStreamProxy.addShards(strArr2);
            mockSplitEnumeratorContext.runPeriodicCallable(0);
            SplitsAssignment splitsAssignment2 = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(2);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment2.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment2.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactly(strArr2);
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment2.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getStartingPosition();
            })).allSatisfy(startingPosition2 -> {
                AssertionsForInterfaceTypes.assertThat(startingPosition2.getShardIteratorType()).isEqualTo(ShardIteratorType.TRIM_HORIZON);
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"provideInitialPositions"})
    @ParameterizedTest
    void testStartWithStateDoesNotAssignCompletedShards(KinesisStreamsSourceConfigConstants.InitialPosition initialPosition, String str, ShardIteratorType shardIteratorType) throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            String generateShardId = TestUtil.generateShardId(0);
            String generateShardId2 = TestUtil.generateShardId(NUM_SUBTASKS);
            Collections.singleton(generateShardId);
            KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState = new KinesisStreamsSourceEnumeratorState(Collections.emptySet(), generateShardId2);
            Configuration configuration = new Configuration();
            configuration.set(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, initialPosition);
            configuration.set(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP, str);
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", configuration, testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), kinesisStreamsSourceEnumeratorState);
            kinesisStreamsSourceEnumerator.start();
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getOneTimeCallables()).isEmpty();
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getPeriodicCallables()).hasSize(NUM_SUBTASKS);
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            kinesisStreamsSourceEnumerator.addReader(NUM_SUBTASKS);
            testStreamProxy.addShards(generateShardId, generateShardId2, TestUtil.generateShardId(2), TestUtil.generateShardId(3));
            mockSplitEnumeratorContext.runPeriodicCallable(0);
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactly(new String[]{TestUtil.generateShardId(2), TestUtil.generateShardId(3)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getStartingPosition();
            })).allSatisfy(startingPosition -> {
                AssertionsForInterfaceTypes.assertThat(startingPosition.getShardIteratorType()).isEqualTo(ShardIteratorType.TRIM_HORIZON);
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testReturnedSplitsWillBeReassigned() throws Throwable {
        MockSplitEnumeratorContext<KinesisShardSplit> mockSplitEnumeratorContext = new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            KinesisStreamsSourceEnumerator simpleEnumeratorWithNoState = getSimpleEnumeratorWithNoState(mockSplitEnumeratorContext, testStreamProxy);
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            simpleEnumeratorWithNoState.addReader(NUM_SUBTASKS);
            String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
            testStreamProxy.addShards(strArr);
            mockSplitEnumeratorContext.runNextOneTimeCallable();
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactly(strArr);
            KinesisShardSplit kinesisShardSplit = (KinesisShardSplit) ((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).get(0);
            simpleEnumeratorWithNoState.addSplitsBack(Collections.singletonList(kinesisShardSplit), NUM_SUBTASKS);
            mockSplitEnumeratorContext.runPeriodicCallable(0);
            SplitsAssignment splitsAssignment2 = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(NUM_SUBTASKS);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment2.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat((List) splitsAssignment2.assignment().get(Integer.valueOf(NUM_SUBTASKS))).containsExactly(new KinesisShardSplit[]{kinesisShardSplit});
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testAddSplitsBackWithoutSplitIsNoOp() throws Throwable {
        MockSplitEnumeratorContext<KinesisShardSplit> mockSplitEnumeratorContext = new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
        try {
            KinesisStreamsSourceEnumerator simpleEnumeratorWithNoState = getSimpleEnumeratorWithNoState(mockSplitEnumeratorContext, KinesisStreamProxyProvider.getTestStreamProxy());
            List singletonList = Collections.singletonList(TestUtil.getTestSplit());
            AssertionsForClassTypes.assertThatNoException().isThrownBy(() -> {
                simpleEnumeratorWithNoState.addSplitsBack(singletonList, NUM_SUBTASKS);
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testAddSplitsBackAssignsUnassignedSplits() throws Throwable {
        MockSplitEnumeratorContext<KinesisShardSplit> mockSplitEnumeratorContext = new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            KinesisStreamsSourceEnumerator simpleEnumeratorWithNoState = getSimpleEnumeratorWithNoState(mockSplitEnumeratorContext, testStreamProxy);
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            simpleEnumeratorWithNoState.addReader(NUM_SUBTASKS);
            String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
            testStreamProxy.addShards(strArr);
            mockSplitEnumeratorContext.runNextOneTimeCallable();
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactly(strArr);
            KinesisShardSplit kinesisShardSplit = (KinesisShardSplit) ((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).get(0);
            simpleEnumeratorWithNoState.addSplitsBack(Collections.singletonList(kinesisShardSplit), NUM_SUBTASKS);
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getSplitsAssignmentSequence()).hasSizeGreaterThan(NUM_SUBTASKS);
            SplitsAssignment splitsAssignment2 = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(NUM_SUBTASKS);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment2.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat((List) splitsAssignment2.assignment().get(Integer.valueOf(NUM_SUBTASKS))).containsExactly(new KinesisShardSplit[]{kinesisShardSplit});
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testHandleSplitRequestIsNoOp() throws Throwable {
        MockSplitEnumeratorContext<KinesisShardSplit> mockSplitEnumeratorContext = new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
        try {
            KinesisStreamsSourceEnumerator simpleEnumeratorWithNoState = getSimpleEnumeratorWithNoState(mockSplitEnumeratorContext, KinesisStreamProxyProvider.getTestStreamProxy());
            AssertionsForClassTypes.assertThatNoException().isThrownBy(() -> {
                simpleEnumeratorWithNoState.handleSplitRequest(NUM_SUBTASKS, "some-hostname");
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testAssignSplitsSurfacesThrowableIfUnableToListShards() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null).start();
            testStreamProxy.setListShardsExceptionSupplier(() -> {
                return AwsServiceException.create("Internal Service Error", (Throwable) null);
            });
            ThrowableTypeAssert assertThatExceptionOfType = AssertionsForClassTypes.assertThatExceptionOfType(FlinkRuntimeException.class);
            Objects.requireNonNull(mockSplitEnumeratorContext);
            assertThatExceptionOfType.isThrownBy(mockSplitEnumeratorContext::runNextOneTimeCallable).withMessage("Failed to list shards.").withStackTraceContaining("Internal Service Error");
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testAssignSplitsHandlesRepeatSplitsGracefully() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
            kinesisStreamsSourceEnumerator.start();
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            kinesisStreamsSourceEnumerator.addReader(NUM_SUBTASKS);
            String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
            testStreamProxy.addShards(strArr);
            mockSplitEnumeratorContext.runNextOneTimeCallable();
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactly(strArr);
            testStreamProxy.setShouldRespectLastSeenShardId(false);
            AssertionsForClassTypes.assertThatNoException().isThrownBy(() -> {
                mockSplitEnumeratorContext.runPeriodicCallable(0);
            });
            AssertionsForInterfaceTypes.assertThat(((SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(NUM_SUBTASKS)).assignment()).isEmpty();
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testAssignSplitWithoutRegisteredReaders() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
            kinesisStreamsSourceEnumerator.start();
            String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
            testStreamProxy.addShards(strArr);
            mockSplitEnumeratorContext.runNextOneTimeCallable();
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getSplitsAssignmentSequence()).isEmpty();
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            kinesisStreamsSourceEnumerator.addReader(NUM_SUBTASKS);
            mockSplitEnumeratorContext.runPeriodicCallable(0);
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS)});
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getShardId();
            })).containsExactlyInAnyOrder(strArr);
            AssertionsForInterfaceTypes.assertThat(((List) splitsAssignment.assignment().get(Integer.valueOf(NUM_SUBTASKS))).stream().map((v0) -> {
                return v0.getStartingPosition();
            })).allSatisfy(startingPosition -> {
                AssertionsForInterfaceTypes.assertThat(startingPosition.getShardIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP);
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testAssignSplitWithInsufficientRegisteredReaders() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(2);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
            kinesisStreamsSourceEnumerator.start();
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
            kinesisStreamsSourceEnumerator.addReader(NUM_SUBTASKS);
            String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
            testStreamProxy.addShards(strArr);
            mockSplitEnumeratorContext.runNextOneTimeCallable();
            AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getSplitsAssignmentSequence()).isEmpty();
            mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(2));
            kinesisStreamsSourceEnumerator.addReader(2);
            mockSplitEnumeratorContext.runPeriodicCallable(0);
            SplitsAssignment splitsAssignment = (SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(0);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment()).containsOnlyKeys(new Integer[]{Integer.valueOf(NUM_SUBTASKS), 2});
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getShardId();
            })).containsExactlyInAnyOrder(strArr);
            AssertionsForInterfaceTypes.assertThat(splitsAssignment.assignment().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getStartingPosition();
            })).allSatisfy(startingPosition -> {
                AssertionsForInterfaceTypes.assertThat(startingPosition.getShardIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP);
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testRestoreFromStateRemembersLastSeenShardId() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            MockSplitEnumeratorContext mockSplitEnumeratorContext2 = new MockSplitEnumeratorContext(NUM_SUBTASKS);
            try {
                KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
                Configuration configuration = new Configuration();
                KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", configuration, testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
                kinesisStreamsSourceEnumerator.start();
                mockSplitEnumeratorContext.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
                kinesisStreamsSourceEnumerator.addReader(NUM_SUBTASKS);
                String[] strArr = {TestUtil.generateShardId(0), TestUtil.generateShardId(NUM_SUBTASKS), TestUtil.generateShardId(2), TestUtil.generateShardId(3)};
                testStreamProxy.addShards(strArr);
                mockSplitEnumeratorContext.runNextOneTimeCallable();
                KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator2 = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext2, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", configuration, testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), kinesisStreamsSourceEnumerator.snapshotState(1L));
                kinesisStreamsSourceEnumerator2.start();
                mockSplitEnumeratorContext2.registerReader(TestUtil.getTestReaderInfo(NUM_SUBTASKS));
                kinesisStreamsSourceEnumerator2.addReader(NUM_SUBTASKS);
                mockSplitEnumeratorContext2.runPeriodicCallable(0);
                AssertionsForInterfaceTypes.assertThat(testStreamProxy.getLastProvidedLastSeenShardId()).isEqualTo(strArr[strArr.length - NUM_SUBTASKS]);
                mockSplitEnumeratorContext2.close();
                mockSplitEnumeratorContext.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testHandleUnrecognisedSourceEventIsNoOp() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), KinesisStreamProxyProvider.getTestStreamProxy(), ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
            AssertionsForClassTypes.assertThatNoException().isThrownBy(() -> {
                kinesisStreamsSourceEnumerator.handleSourceEvent(NUM_SUBTASKS, new SourceEvent() { // from class: org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorTest.1
                });
            });
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testCloseClosesStreamProxy() throws Throwable {
        MockSplitEnumeratorContext mockSplitEnumeratorContext = new MockSplitEnumeratorContext(NUM_SUBTASKS);
        try {
            KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
            KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), testStreamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
            kinesisStreamsSourceEnumerator.start();
            NotThrownAssert assertThatNoException = AssertionsForClassTypes.assertThatNoException();
            Objects.requireNonNull(kinesisStreamsSourceEnumerator);
            assertThatNoException.isThrownBy(kinesisStreamsSourceEnumerator::close);
            AssertionsForInterfaceTypes.assertThat(testStreamProxy.isClosed()).isTrue();
            mockSplitEnumeratorContext.close();
        } catch (Throwable th) {
            try {
                mockSplitEnumeratorContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private KinesisStreamsSourceEnumerator getSimpleEnumeratorWithNoState(MockSplitEnumeratorContext<KinesisShardSplit> mockSplitEnumeratorContext, StreamProxy streamProxy) {
        KinesisStreamsSourceEnumerator kinesisStreamsSourceEnumerator = new KinesisStreamsSourceEnumerator(mockSplitEnumeratorContext, "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream", new Configuration(), streamProxy, ShardAssignerFactory.uniformShardAssigner(), (KinesisStreamsSourceEnumeratorState) null);
        kinesisStreamsSourceEnumerator.start();
        AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getOneTimeCallables()).hasSize(NUM_SUBTASKS);
        AssertionsForInterfaceTypes.assertThat(mockSplitEnumeratorContext.getPeriodicCallables()).hasSize(NUM_SUBTASKS);
        return kinesisStreamsSourceEnumerator;
    }

    private static Stream<Arguments> provideInitialPositions() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{KinesisStreamsSourceConfigConstants.InitialPosition.LATEST, "", ShardIteratorType.AT_TIMESTAMP}), Arguments.of(new Object[]{KinesisStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON, "", ShardIteratorType.TRIM_HORIZON}), Arguments.of(new Object[]{KinesisStreamsSourceConfigConstants.InitialPosition.AT_TIMESTAMP, "2023-04-13T09:18:00.0+01:00", ShardIteratorType.AT_TIMESTAMP})});
    }
}
