package org.apache.flink.connector.kinesis.source.reader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider;
import org.apache.flink.connector.kinesis.source.util.TestUtil;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.assertj.core.api.NotThrownAssert;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.Record;

/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.class */
class PollingKinesisShardSplitReaderTest {
    PollingKinesisShardSplitReaderTest() {
    }

    @Test
    void testNoAssignedSplitsHandledGracefully() throws Exception {
        RecordsWithSplitIds fetch = new PollingKinesisShardSplitReader(KinesisStreamProxyProvider.getTestStreamProxy()).fetch();
        AssertionsForInterfaceTypes.assertThat((Record) fetch.nextRecordFromSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.nextSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.finishedSplits()).isEmpty();
    }

    @Test
    void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception {
        KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(testStreamProxy);
        String generateShardId = TestUtil.generateShardId(1);
        testStreamProxy.addShards(generateShardId);
        pollingKinesisShardSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(TestUtil.getTestSplit(generateShardId))));
        RecordsWithSplitIds fetch = pollingKinesisShardSplitReader.fetch();
        AssertionsForInterfaceTypes.assertThat((Record) fetch.nextRecordFromSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.nextSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.finishedSplits()).isEmpty();
    }

    @Test
    void testSingleAssignedSplitAllConsumed() throws Exception {
        KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(testStreamProxy);
        String generateShardId = TestUtil.generateShardId(1);
        testStreamProxy.addShards(generateShardId);
        List list = (List) Stream.of((Object[]) new Record[]{TestUtil.getTestRecord("data-1"), TestUtil.getTestRecord("data-2"), TestUtil.getTestRecord("data-3")}).collect(Collectors.toList());
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.singletonList((Record) list.get(0)));
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.singletonList((Record) list.get(1)));
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.singletonList((Record) list.get(2)));
        pollingKinesisShardSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(TestUtil.getTestSplit(generateShardId))));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            arrayList.addAll(readAllRecords(pollingKinesisShardSplitReader.fetch()));
        }
        AssertionsForInterfaceTypes.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
    }

    @Test
    void testMultipleAssignedSplitsAllConsumed() throws Exception {
        KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(testStreamProxy);
        String generateShardId = TestUtil.generateShardId(1);
        testStreamProxy.addShards(generateShardId);
        List list = (List) Stream.of((Object[]) new Record[]{TestUtil.getTestRecord("data-1"), TestUtil.getTestRecord("data-2"), TestUtil.getTestRecord("data-3")}).collect(Collectors.toList());
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.singletonList((Record) list.get(0)));
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.singletonList((Record) list.get(1)));
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.singletonList((Record) list.get(2)));
        pollingKinesisShardSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(TestUtil.getTestSplit(generateShardId))));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            arrayList.addAll(readAllRecords(pollingKinesisShardSplitReader.fetch()));
        }
        AssertionsForInterfaceTypes.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
    }

    @Test
    void testHandleEmptyCompletedShard() throws Exception {
        KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(testStreamProxy);
        String generateShardId = TestUtil.generateShardId(1);
        testStreamProxy.addShards(generateShardId);
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, Collections.emptyList());
        KinesisShardSplit testSplit = TestUtil.getTestSplit(generateShardId);
        pollingKinesisShardSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(testSplit)));
        testStreamProxy.setShouldCompleteNextShard(true);
        RecordsWithSplitIds fetch = pollingKinesisShardSplitReader.fetch();
        AssertionsForInterfaceTypes.assertThat((Record) fetch.nextRecordFromSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.nextSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.finishedSplits()).contains(new String[]{testSplit.splitId()});
    }

    @Test
    void testFinishedSplitsReturned() throws Exception {
        KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(testStreamProxy);
        String generateShardId = TestUtil.generateShardId(1);
        testStreamProxy.addShards(generateShardId);
        List<Record> list = (List) Stream.of((Object[]) new Record[]{TestUtil.getTestRecord("data-1"), TestUtil.getTestRecord("data-2"), TestUtil.getTestRecord("data-3")}).collect(Collectors.toList());
        testStreamProxy.addRecords(TestUtil.STREAM_ARN, generateShardId, list);
        KinesisShardSplit testSplit = TestUtil.getTestSplit(generateShardId);
        pollingKinesisShardSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(testSplit)));
        ArrayList arrayList = new ArrayList();
        testStreamProxy.setShouldCompleteNextShard(true);
        RecordsWithSplitIds fetch = pollingKinesisShardSplitReader.fetch();
        for (int i = 0; i < list.size(); i++) {
            AssertionsForInterfaceTypes.assertThat(fetch.nextSplit()).isEqualTo(testSplit.splitId());
            AssertionsForInterfaceTypes.assertThat(fetch.finishedSplits()).isEmpty();
            arrayList.add((Record) fetch.nextRecordFromSplit());
        }
        AssertionsForInterfaceTypes.assertThat(fetch.nextSplit()).isNull();
        AssertionsForInterfaceTypes.assertThat(fetch.finishedSplits()).contains(new String[]{testSplit.splitId()});
        AssertionsForInterfaceTypes.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
    }

    @Test
    void testWakeUpIsNoOp() {
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(KinesisStreamProxyProvider.getTestStreamProxy());
        NotThrownAssert assertThatNoException = AssertionsForClassTypes.assertThatNoException();
        Objects.requireNonNull(pollingKinesisShardSplitReader);
        assertThatNoException.isThrownBy(pollingKinesisShardSplitReader::wakeUp);
    }

    @Test
    void testCloseClosesStreamProxy() {
        KinesisStreamProxyProvider.TestKinesisStreamProxy testStreamProxy = KinesisStreamProxyProvider.getTestStreamProxy();
        PollingKinesisShardSplitReader pollingKinesisShardSplitReader = new PollingKinesisShardSplitReader(testStreamProxy);
        NotThrownAssert assertThatNoException = AssertionsForClassTypes.assertThatNoException();
        Objects.requireNonNull(pollingKinesisShardSplitReader);
        assertThatNoException.isThrownBy(pollingKinesisShardSplitReader::close);
        AssertionsForInterfaceTypes.assertThat(testStreamProxy.isClosed()).isTrue();
    }

    private List<Record> readAllRecords(RecordsWithSplitIds<Record> recordsWithSplitIds) {
        Record record;
        ArrayList arrayList = new ArrayList();
        do {
            record = (Record) recordsWithSplitIds.nextRecordFromSplit();
            if (record != null) {
                arrayList.add(record);
            }
        } while (record != null);
        return arrayList;
    }
}
