package org.apache.flink.connector.base.source.hybrid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.mock.Whitebox;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.class */
public class HybridSourceSplitEnumeratorTest {
    private static final int SUBTASK0 = 0;
    private static final int SUBTASK1 = 1;
    private static final MockBaseSource MOCK_SOURCE = new MockBaseSource(SUBTASK1, SUBTASK1, Boundedness.BOUNDED);
    private HybridSource<Integer> source;
    private MockSplitEnumeratorContext<HybridSourceSplit> context;
    private HybridSourceSplitEnumerator enumerator;
    private HybridSourceSplit splitFromSource0;
    private HybridSourceSplit splitFromSource1;

    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest$UnderlyingEnumeratorWrapper.class */
    private static class UnderlyingEnumeratorWrapper implements SplitEnumerator<MockSourceSplit, Object> {
        private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(HybridSourceSplitEnumeratorTest.SUBTASK0, HybridSourceSplitEnumeratorTest.SUBTASK0, HybridSourceSplitEnumeratorTest.SUBTASK1);
        private final List<Integer> handleSplitRequests;
        private final MockSplitEnumerator enumerator;
        private final SplitEnumeratorContext context;

        private UnderlyingEnumeratorWrapper(MockSplitEnumerator mockSplitEnumerator) {
            this.handleSplitRequests = new ArrayList();
            this.enumerator = mockSplitEnumerator;
            this.context = (SplitEnumeratorContext) Whitebox.getInternalState(mockSplitEnumerator, "context");
        }

        public void handleSplitRequest(int i, String str) {
            this.handleSplitRequests.add(Integer.valueOf(i));
            this.context.assignSplits(new SplitsAssignment(SPLIT_1, i));
        }

        public void start() {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List list, int i) {
            this.enumerator.addSplitsBack(list, i);
        }

        public void addReader(int i) {
            this.enumerator.addReader(i);
        }

        public Object snapshotState(long j) throws Exception {
            return this.enumerator.m20snapshotState(j);
        }

        public void close() throws IOException {
            this.enumerator.close();
        }
    }

    private void setupEnumeratorAndTriggerSourceSwitch() {
        this.context = new MockSplitEnumeratorContext<>(2);
        this.source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        registerReader(this.context, this.enumerator, SUBTASK0);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        registerReader(this.context, this.enumerator, SUBTASK1);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).hasSize(SUBTASK1);
        this.splitFromSource0 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(SUBTASK0)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
        Assertions.assertThat(this.splitFromSource0.sourceIndex()).isEqualTo(SUBTASK0);
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).isEqualTo(SUBTASK0);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(SUBTASK0));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("one reader finished", new Object[SUBTASK0]).isEqualTo(SUBTASK0);
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK0));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("both readers finished", new Object[SUBTASK0]).isEqualTo(SUBTASK1);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).as("switch triggers split assignment", new Object[SUBTASK0]).hasSize(2);
        this.splitFromSource1 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(SUBTASK1)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
        Assertions.assertThat(this.splitFromSource1.sourceIndex()).isEqualTo(SUBTASK1);
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("reader without assignment", new Object[SUBTASK0]).isEqualTo(SUBTASK1);
    }

    @Test
    public void testHighCardinalitySources() {
        this.context = new MockSplitEnumeratorContext<>(2);
        HybridSource.HybridSourceBuilder builder = HybridSource.builder(MOCK_SOURCE);
        for (int i = SUBTASK1; i < 130; i += SUBTASK1) {
            builder = builder.addSource(MOCK_SOURCE);
        }
        this.source = builder.build();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        registerReader(this.context, this.enumerator, SUBTASK0);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        registerReader(this.context, this.enumerator, SUBTASK1);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).hasSize(SUBTASK1);
        this.splitFromSource0 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(SUBTASK0)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
        Assertions.assertThat(this.splitFromSource0.sourceIndex()).isEqualTo(SUBTASK0);
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).isEqualTo(SUBTASK0);
        for (int i2 = SUBTASK0; i2 < 130; i2 += SUBTASK1) {
            this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(i2));
            Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("one reader finished", new Object[SUBTASK0]).isEqualTo(i2);
            this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(i2));
            if (i2 < 129) {
                Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("both readers finished", new Object[SUBTASK0]).isEqualTo(i2 + SUBTASK1);
                Assertions.assertThat(this.context.getSplitsAssignmentSequence()).as("switch triggers split assignment", new Object[SUBTASK0]).hasSize(i2 + 2);
                this.splitFromSource1 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(i2)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
                Assertions.assertThat(this.splitFromSource1.sourceIndex()).isEqualTo(i2);
            } else {
                Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("both readers finished", new Object[SUBTASK0]).isEqualTo(129);
            }
        }
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("reader without assignment", new Object[SUBTASK0]).isEqualTo(129);
    }

    @Test
    public void testRegisterReaderAfterSwitchAndReaderReset() {
        setupEnumeratorAndTriggerSourceSwitch();
        this.context.getSplitsAssignmentSequence().clear();
        this.enumerator.addReader(SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        assertSplitAssignment("addSplitsBack triggers assignment when reader registered", this.context, SUBTASK1, this.splitFromSource0, SUBTASK0);
        this.context.getSplitsAssignmentSequence().clear();
        this.context.unregisterReader(SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).as("addSplitsBack doesn't trigger assignment when reader not registered", new Object[SUBTASK0]).isEmpty();
        registerReader(this.context, this.enumerator, SUBTASK0);
        Assertions.assertThat(this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        assertSplitAssignment("registerReader triggers assignment", this.context, SUBTASK1, this.splitFromSource0, SUBTASK0);
    }

    @Test
    public void testHandleSplitRequestAfterSwitchAndReaderReset() {
        setupEnumeratorAndTriggerSourceSwitch();
        UnderlyingEnumeratorWrapper underlyingEnumeratorWrapper = new UnderlyingEnumeratorWrapper(getCurrentEnumerator(this.enumerator));
        Whitebox.setInternalState(this.enumerator, "currentEnumerator", underlyingEnumeratorWrapper);
        Assertions.assertThat((List) Whitebox.getInternalState(underlyingEnumeratorWrapper.enumerator, "splits")).isEmpty();
        this.context.getSplitsAssignmentSequence().clear();
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("current enumerator", new Object[SUBTASK0]).isEqualTo(SUBTASK1);
        Assertions.assertThat(underlyingEnumeratorWrapper.handleSplitRequests).isEmpty();
        this.enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(SUBTASK1, MOCK_SOURCE);
        assertSplitAssignment("handleSplitRequest triggers assignment of split by underlying enumerator", this.context, SUBTASK1, HybridSourceSplit.wrapSplit(UnderlyingEnumeratorWrapper.SPLIT_1, SUBTASK1, switchedSources), SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        Assertions.assertThatThrownBy(() -> {
            this.enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testRestoreEnumerator() throws Exception {
        setupEnumeratorAndTriggerSourceSwitch();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceEnumeratorState snapshotState = this.enumerator.snapshotState(0L);
        Assertions.assertThat((List) Whitebox.getInternalState(getCurrentEnumerator(this.enumerator), "splits")).hasSize(SUBTASK1);
        this.enumerator = this.source.restoreEnumerator(this.context, snapshotState);
        this.enumerator.start();
        Assertions.assertThat((List) Whitebox.getInternalState(getCurrentEnumerator(this.enumerator), "splits")).hasSize(SUBTASK1);
    }

    @Test
    public void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() throws Exception {
        setupEnumeratorAndTriggerSourceSwitch();
        HybridSourceEnumeratorState snapshotState = this.enumerator.snapshotState(0L);
        Assertions.assertThat((List) Whitebox.getInternalState(getCurrentEnumerator(this.enumerator), "splits")).hasSize(SUBTASK0);
        this.enumerator = this.source.restoreEnumerator(this.context, snapshotState);
        this.enumerator.start();
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((List) Whitebox.getInternalState(getCurrentEnumerator(this.enumerator), "splits")).hasSize(SUBTASK0);
    }

    @Test
    public void testDefaultMethodDelegation() throws Exception {
        setupEnumeratorAndTriggerSourceSwitch();
        SplitEnumerator splitEnumerator = (SplitEnumerator) Mockito.spy(getCurrentEnumerator(this.enumerator));
        Whitebox.setInternalState(this.enumerator, "currentEnumerator", splitEnumerator);
        this.enumerator.notifyCheckpointComplete(1L);
        ((SplitEnumerator) Mockito.verify(splitEnumerator)).notifyCheckpointComplete(1L);
        this.enumerator.notifyCheckpointAborted(2L);
        ((SplitEnumerator) Mockito.verify(splitEnumerator)).notifyCheckpointAborted(2L);
        SwitchSourceEvent switchSourceEvent = new SwitchSourceEvent(SUBTASK0, (Source) null, false);
        this.enumerator.handleSourceEvent(SUBTASK0, switchSourceEvent);
        ((SplitEnumerator) Mockito.verify(splitEnumerator)).handleSourceEvent(SUBTASK0, switchSourceEvent);
    }

    @Test
    public void testInterceptNoMoreSplitEvent() {
        this.context = new MockSplitEnumeratorContext<>(2);
        this.source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        registerReader(this.context, this.enumerator, SUBTASK0);
        registerReader(this.context, this.enumerator, SUBTASK1);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(this.context.hasNoMoreSplits(SUBTASK0)).isFalse();
        Assertions.assertThat(this.context.hasNoMoreSplits(SUBTASK1)).isFalse();
        this.splitFromSource0 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(SUBTASK0)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(SUBTASK0));
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK0));
        Assertions.assertThat(this.context.hasNoMoreSplits(SUBTASK0)).isTrue();
        Assertions.assertThat(this.context.hasNoMoreSplits(SUBTASK1)).isTrue();
        this.context.getSplitsAssignmentSequence().clear();
        this.context.resetNoMoreSplits(SUBTASK0);
        this.enumerator.addReader(SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(this.context.hasNoMoreSplits(SUBTASK0)).isFalse();
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(SUBTASK0));
        Assertions.assertThat(this.context.hasNoMoreSplits(SUBTASK0)).isTrue();
    }

    @Test
    public void testMultiSubtaskSwitchEnumerator() {
        this.context = new MockSplitEnumeratorContext<>(2);
        this.source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        registerReader(this.context, this.enumerator, SUBTASK0);
        registerReader(this.context, this.enumerator, SUBTASK1);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).isEqualTo(SUBTASK0);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(SUBTASK0));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).isEqualTo(SUBTASK0);
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK0));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("all reader finished source-0", new Object[SUBTASK0]).isEqualTo(SUBTASK1);
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(SUBTASK1));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("only reader-0 has finished reading, reader-1 is not yet done, so do not switch to the next source", new Object[SUBTASK0]).isEqualTo(SUBTASK1);
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1));
        Assertions.assertThat(getCurrentSourceIndex(this.enumerator)).as("all reader finished source-1", new Object[SUBTASK0]).isEqualTo(2);
    }

    private static void assertSplitAssignment(String str, MockSplitEnumeratorContext<HybridSourceSplit> mockSplitEnumeratorContext, int i, HybridSourceSplit hybridSourceSplit, int i2) {
        Assertions.assertThat(mockSplitEnumeratorContext.getSplitsAssignmentSequence()).as(str, new Object[SUBTASK0]).hasSize(i);
        Assertions.assertThat(((List) ((SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(i - SUBTASK1)).assignment().get(Integer.valueOf(i2))).get(SUBTASK0)).as(str, new Object[SUBTASK0]).isEqualTo(hybridSourceSplit);
    }

    private static void registerReader(MockSplitEnumeratorContext<HybridSourceSplit> mockSplitEnumeratorContext, HybridSourceSplitEnumerator hybridSourceSplitEnumerator, int i) {
        mockSplitEnumeratorContext.registerReader(new ReaderInfo(i, "location 0"));
        hybridSourceSplitEnumerator.addReader(i);
    }

    private static int getCurrentSourceIndex(HybridSourceSplitEnumerator hybridSourceSplitEnumerator) {
        return ((Integer) Whitebox.getInternalState(hybridSourceSplitEnumerator, "currentSourceIndex")).intValue();
    }

    private static MockSplitEnumerator getCurrentEnumerator(HybridSourceSplitEnumerator hybridSourceSplitEnumerator) {
        return (MockSplitEnumerator) Whitebox.getInternalState(hybridSourceSplitEnumerator, "currentEnumerator");
    }
}
