package org.apache.flink.connector.base.source.hybrid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.mock.Whitebox;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.class */
public class HybridSourceSplitEnumeratorTest {
    private static final int SUBTASK0 = 0;
    private static final int SUBTASK1 = 1;
    private static final MockBaseSource MOCK_SOURCE = new MockBaseSource(SUBTASK1, SUBTASK1, Boundedness.BOUNDED);
    private HybridSource<Integer> source;
    private MockSplitEnumeratorContext<HybridSourceSplit> context;
    private HybridSourceSplitEnumerator enumerator;
    private HybridSourceSplit splitFromSource0;
    private HybridSourceSplit splitFromSource1;

    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest$UnderlyingEnumeratorWrapper.class */
    private static class UnderlyingEnumeratorWrapper implements SplitEnumerator<MockSourceSplit, Object> {
        private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(HybridSourceSplitEnumeratorTest.SUBTASK0, HybridSourceSplitEnumeratorTest.SUBTASK0, HybridSourceSplitEnumeratorTest.SUBTASK1);
        private final List<Integer> handleSplitRequests;
        private final MockSplitEnumerator enumerator;
        private final SplitEnumeratorContext context;

        private UnderlyingEnumeratorWrapper(MockSplitEnumerator mockSplitEnumerator) {
            this.handleSplitRequests = new ArrayList();
            this.enumerator = mockSplitEnumerator;
            this.context = (SplitEnumeratorContext) Whitebox.getInternalState(mockSplitEnumerator, "context");
        }

        public void handleSplitRequest(int i, String str) {
            this.handleSplitRequests.add(Integer.valueOf(i));
            this.context.assignSplits(new SplitsAssignment(SPLIT_1, i));
        }

        public void start() {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List list, int i) {
            this.enumerator.addSplitsBack(list, i);
        }

        public void addReader(int i) {
            this.enumerator.addReader(i);
        }

        public Object snapshotState(long j) throws Exception {
            return this.enumerator.m18snapshotState(j);
        }

        public void close() throws IOException {
            this.enumerator.close();
        }
    }

    private void setupEnumeratorAndTriggerSourceSwitch() {
        this.context = new MockSplitEnumeratorContext<>(2);
        this.source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        registerReader(this.context, this.enumerator, SUBTASK0);
        Assert.assertThat(this.context.getSplitsAssignmentSequence(), Matchers.emptyIterable());
        registerReader(this.context, this.enumerator, SUBTASK1);
        Assert.assertThat(this.context.getSplitsAssignmentSequence(), Matchers.emptyIterable());
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        Assert.assertThat(this.context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(SUBTASK0));
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
        Assert.assertThat(this.context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(SUBTASK1));
        this.splitFromSource0 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(SUBTASK0)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
        Assert.assertEquals(0L, this.splitFromSource0.sourceIndex());
        Assert.assertEquals(0L, getCurrentSourceIndex(this.enumerator));
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(SUBTASK0));
        Assert.assertEquals("one reader finished", 0L, getCurrentSourceIndex(this.enumerator));
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK0));
        Assert.assertEquals("both readers finished", 1L, getCurrentSourceIndex(this.enumerator));
        Assert.assertThat("switch triggers split assignment", this.context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(2));
        this.splitFromSource1 = (HybridSourceSplit) ((List) ((SplitsAssignment) this.context.getSplitsAssignmentSequence().get(SUBTASK1)).assignment().get(Integer.valueOf(SUBTASK0))).get(SUBTASK0);
        Assert.assertEquals(1L, this.splitFromSource1.sourceIndex());
        this.enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1));
        Assert.assertEquals("reader without assignment", 1L, getCurrentSourceIndex(this.enumerator));
    }

    @Test
    public void testRegisterReaderAfterSwitchAndReaderReset() {
        setupEnumeratorAndTriggerSourceSwitch();
        this.context.getSplitsAssignmentSequence().clear();
        this.enumerator.addReader(SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        Assert.assertThat(this.context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(SUBTASK0));
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        assertSplitAssignment("addSplitsBack triggers assignment when reader registered", this.context, SUBTASK1, this.splitFromSource0, SUBTASK0);
        this.context.getSplitsAssignmentSequence().clear();
        this.context.unregisterReader(SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        Assert.assertThat("addSplitsBack doesn't trigger assignment when reader not registered", this.context.getSplitsAssignmentSequence(), Matchers.emptyIterable());
        registerReader(this.context, this.enumerator, SUBTASK0);
        Assert.assertThat(this.context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(SUBTASK0));
        this.enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
        assertSplitAssignment("registerReader triggers assignment", this.context, SUBTASK1, this.splitFromSource0, SUBTASK0);
    }

    @Test
    public void testHandleSplitRequestAfterSwitchAndReaderReset() {
        setupEnumeratorAndTriggerSourceSwitch();
        UnderlyingEnumeratorWrapper underlyingEnumeratorWrapper = new UnderlyingEnumeratorWrapper(getCurrentEnumerator(this.enumerator));
        Whitebox.setInternalState(this.enumerator, "currentEnumerator", underlyingEnumeratorWrapper);
        Assert.assertThat((List) Whitebox.getInternalState(underlyingEnumeratorWrapper.enumerator, "splits"), Matchers.emptyIterable());
        this.context.getSplitsAssignmentSequence().clear();
        Assert.assertEquals("current enumerator", 1L, getCurrentSourceIndex(this.enumerator));
        Assert.assertThat(underlyingEnumeratorWrapper.handleSplitRequests, Matchers.emptyIterable());
        this.enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(SUBTASK1, MOCK_SOURCE);
        assertSplitAssignment("handleSplitRequest triggers assignment of split by underlying enumerator", this.context, SUBTASK1, HybridSourceSplit.wrapSplit(UnderlyingEnumeratorWrapper.SPLIT_1, SUBTASK1, switchedSources), SUBTASK0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), SUBTASK0);
        try {
            this.enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
            Assert.fail("expected exception");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testRestoreEnumerator() throws Exception {
        setupEnumeratorAndTriggerSourceSwitch();
        this.enumerator = this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceEnumeratorState snapshotState = this.enumerator.snapshotState(0L);
        Assert.assertThat((List) Whitebox.getInternalState(getCurrentEnumerator(this.enumerator), "splits"), Matchers.iterableWithSize(SUBTASK1));
        this.enumerator = this.source.restoreEnumerator(this.context, snapshotState);
        this.enumerator.start();
        Assert.assertThat((List) Whitebox.getInternalState(getCurrentEnumerator(this.enumerator), "splits"), Matchers.iterableWithSize(SUBTASK1));
    }

    @Test
    public void testDefaultMethodDelegation() throws Exception {
        setupEnumeratorAndTriggerSourceSwitch();
        SplitEnumerator splitEnumerator = (SplitEnumerator) Mockito.spy(getCurrentEnumerator(this.enumerator));
        Whitebox.setInternalState(this.enumerator, "currentEnumerator", splitEnumerator);
        this.enumerator.notifyCheckpointComplete(1L);
        ((SplitEnumerator) Mockito.verify(splitEnumerator)).notifyCheckpointComplete(1L);
        this.enumerator.notifyCheckpointAborted(2L);
        ((SplitEnumerator) Mockito.verify(splitEnumerator)).notifyCheckpointAborted(2L);
        SwitchSourceEvent switchSourceEvent = new SwitchSourceEvent(SUBTASK0, (Source) null, false);
        this.enumerator.handleSourceEvent(SUBTASK0, switchSourceEvent);
        ((SplitEnumerator) Mockito.verify(splitEnumerator)).handleSourceEvent(SUBTASK0, switchSourceEvent);
    }

    private static void assertSplitAssignment(String str, MockSplitEnumeratorContext<HybridSourceSplit> mockSplitEnumeratorContext, int i, HybridSourceSplit hybridSourceSplit, int i2) {
        Assert.assertThat(str, mockSplitEnumeratorContext.getSplitsAssignmentSequence(), Matchers.iterableWithSize(i));
        Assert.assertEquals(str, hybridSourceSplit, ((List) ((SplitsAssignment) mockSplitEnumeratorContext.getSplitsAssignmentSequence().get(i - SUBTASK1)).assignment().get(Integer.valueOf(i2))).get(SUBTASK0));
    }

    private static void registerReader(MockSplitEnumeratorContext<HybridSourceSplit> mockSplitEnumeratorContext, HybridSourceSplitEnumerator hybridSourceSplitEnumerator, int i) {
        mockSplitEnumeratorContext.registerReader(new ReaderInfo(i, "location 0"));
        hybridSourceSplitEnumerator.addReader(i);
    }

    private static int getCurrentSourceIndex(HybridSourceSplitEnumerator hybridSourceSplitEnumerator) {
        return ((Integer) Whitebox.getInternalState(hybridSourceSplitEnumerator, "currentSourceIndex")).intValue();
    }

    private static MockSplitEnumerator getCurrentEnumerator(HybridSourceSplitEnumerator hybridSourceSplitEnumerator) {
        return (MockSplitEnumerator) Whitebox.getInternalState(hybridSourceSplitEnumerator, "currentEnumerator");
    }
}
