package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.class */
public class ContinuousFileSplitEnumeratorTest {

    /* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest$Builder.class */
    private static class Builder {
        private SplitEnumeratorContext<FileStoreSourceSplit> context;
        private Collection<FileStoreSourceSplit> initialSplits;
        private long discoveryInterval;
        private StreamTableScan scan;
        private BucketMode bucketMode;

        private Builder() {
            this.initialSplits = Collections.emptyList();
            this.discoveryInterval = Long.MAX_VALUE;
            this.bucketMode = BucketMode.FIXED;
        }

        public Builder setSplitEnumeratorContext(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext) {
            this.context = splitEnumeratorContext;
            return this;
        }

        public Builder setInitialSplits(Collection<FileStoreSourceSplit> collection) {
            this.initialSplits = collection;
            return this;
        }

        public Builder setDiscoveryInterval(long j) {
            this.discoveryInterval = j;
            return this;
        }

        public Builder setScan(StreamTableScan streamTableScan) {
            this.scan = streamTableScan;
            return this;
        }

        public Builder withBucketMode(BucketMode bucketMode) {
            this.bucketMode = bucketMode;
            return this;
        }

        public ContinuousFileSplitEnumerator build() {
            return new ContinuousFileSplitEnumerator(this.context, this.initialSplits, (Long) null, this.discoveryInterval, this.scan, this.bucketMode);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest$MockScan.class */
    private static class MockScan implements StreamTableScan {
        private final TreeMap<Long, TableScan.Plan> results;
        private boolean allowEnd = true;

        @Nullable
        private Long nextSnapshotId = null;

        public MockScan(TreeMap<Long, TableScan.Plan> treeMap) {
            this.results = treeMap;
        }

        public TableScan.Plan plan() {
            Map.Entry<Long, TableScan.Plan> pollFirstEntry = this.results.pollFirstEntry();
            if (pollFirstEntry != null) {
                this.nextSnapshotId = Long.valueOf(pollFirstEntry.getKey().longValue() + 1);
                return pollFirstEntry.getValue();
            }
            if (this.allowEnd) {
                throw new EndOfScanException();
            }
            return SnapshotNotExistPlan.INSTANCE;
        }

        public List<BinaryRow> listPartitions() {
            throw new UnsupportedOperationException();
        }

        /* renamed from: checkpoint, reason: merged with bridge method [inline-methods] */
        public Long m76checkpoint() {
            return this.nextSnapshotId;
        }

        public void notifyCheckpointComplete(@Nullable Long l) {
        }

        @Nullable
        public Long watermark() {
            return null;
        }

        public void restore(Long l) {
        }

        public void allowEnd(boolean z) {
            this.allowEnd = z;
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest$TestingAsyncSplitEnumeratorContext.class */
    private static class TestingAsyncSplitEnumeratorContext<SplitT extends SourceSplit> extends TestingSplitEnumeratorContext<SplitT> {
        private final ManuallyTriggeredScheduledExecutorService workerExecutor;
        private final ExecutorNotifier notifier;

        public TestingAsyncSplitEnumeratorContext(int i) {
            super(i);
            this.workerExecutor = new ManuallyTriggeredScheduledExecutorService();
            this.notifier = new ExecutorNotifier(this.workerExecutor, super.getExecutorService());
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
            this.notifier.notifyReadyAsync(callable, biConsumer);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
            this.notifier.notifyReadyAsync(callable, biConsumer, j, j2);
        }

        public void triggerAllWorkerAction() {
            this.workerExecutor.triggerPeriodicScheduledTasks();
            this.workerExecutor.triggerAll();
        }

        public void triggerAlCoordinatorAction() {
            super.triggerAllActions();
        }

        public void triggerNextCoordinatorAction() {
            super.getExecutorService().trigger();
        }
    }

    @Test
    public void testSplitAllocationIsOrdered() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(1);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 4; i++) {
            arrayList.add(createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        ArrayList arrayList2 = new ArrayList(arrayList);
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(arrayList).setDiscoveryInterval(3L).build();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits();
        Assertions.assertThat(assignedSplits).hasSameElementsAs(arrayList2.subList(0, 2));
        build.addSplitsBack(assignedSplits, 0);
        testingSplitEnumeratorContext.getSplitAssignments().clear();
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments()).isEmpty();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(0)).getAssignedSplits()).hasSameElementsAs(arrayList2.subList(0, 2));
        testingSplitEnumeratorContext.getSplitAssignments().clear();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments3 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments3).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments3.get(0)).getAssignedSplits()).hasSameElementsAs(arrayList2.subList(2, 4));
    }

    @Test
    public void testSplitWithBatch() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(1);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 18; i++) {
            arrayList.add(createSnapshotSplit(i, i, Collections.emptyList()));
        }
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(arrayList).setDiscoveryInterval(3L).build();
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).hasSize(1);
        build.handleSplitRequest(0, "test-host");
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).hasSize(2);
        build.handleSplitRequest(0, "test-host");
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).hasSize(3);
    }

    @Test
    public void testSplitAllocationIsFair() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(1);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 2; i++) {
            arrayList.add(createSnapshotSplit(i, 0, Collections.emptyList()));
            arrayList.add(createSnapshotSplit(i, 1, Collections.emptyList()));
        }
        ArrayList arrayList2 = new ArrayList(arrayList);
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(arrayList).setDiscoveryInterval(3L).build();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).hasSameElementsAs(arrayList2.subList(0, 2));
        testingSplitEnumeratorContext.getSplitAssignments().clear();
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments()).isEmpty();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(0)).getAssignedSplits()).hasSameElementsAs(arrayList2.subList(2, 4));
    }

    @Test
    public void testSnapshotEnumerator() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(2);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        TreeMap treeMap = new TreeMap();
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(new MockScan(treeMap)).build();
        build.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(createDataSplit(0L, i, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.triggerAllActions();
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(0)});
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(0)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(0), (DataSplit) arrayList.get(2)});
        build.handleSplitRequest(0, "test-host");
        testingSplitEnumeratorContext.triggerAllActions();
        Map splitAssignments3 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments3).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments3.get(0)).hasReceivedNoMoreSplitsSignal()).isTrue();
        splitAssignments3.clear();
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments4 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments4).containsOnlyKeys(new Integer[]{1});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments4.get(1)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(1)});
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments5 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments5).containsOnlyKeys(new Integer[]{1});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments5.get(1)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(1), (DataSplit) arrayList.get(3)});
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments6 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments6).containsOnlyKeys(new Integer[]{1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments6.get(1)).hasReceivedNoMoreSplitsSignal()).isTrue();
    }

    @Test
    public void testUnawareBucketEnumeratorWithBucket() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(3);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        TreeMap treeMap = new TreeMap();
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(new MockScan(treeMap)).withBucketMode(BucketMode.UNAWARE).build();
        build.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createDataSplit(0L, 1, Collections.emptyList()));
        treeMap.put(1L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.triggerAllActions();
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).size()).isEqualTo(1);
        arrayList.clear();
        arrayList.add(createDataSplit(0L, 2, Collections.emptyList()));
        treeMap.put(2L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.triggerAllActions();
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(0)).getAssignedSplits()).size()).isEqualTo(2);
    }

    @Test
    public void testUnawareBucketEnumeratorLot() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(4);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        testingSplitEnumeratorContext.registerReader(2, "test-host");
        testingSplitEnumeratorContext.registerReader(3, "test-host");
        TreeMap treeMap = new TreeMap();
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(new MockScan(treeMap)).withBucketMode(BucketMode.UNAWARE).build();
        build.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(createDataSplit(0L, 0, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.triggerAllActions();
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).size()).isEqualTo(1);
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(1)).getAssignedSplits()).size()).isEqualTo(1);
        build.handleSplitRequest(2, "test-host");
        Map splitAssignments3 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments3).containsOnlyKeys(new Integer[]{0, 1, 2});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments3.get(2)).getAssignedSplits()).size()).isEqualTo(1);
        for (int i2 = 0; i2 < 97; i2++) {
            build.handleSplitRequest(3, "test-host");
            Map splitAssignments4 = testingSplitEnumeratorContext.getSplitAssignments();
            Assertions.assertThat(splitAssignments4).containsOnlyKeys(new Integer[]{0, 1, 2, 3});
            Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments4.get(3)).getAssignedSplits()).size()).isEqualTo(i2 + 1);
        }
        build.handleSplitRequest(3, "test-host");
        testingSplitEnumeratorContext.triggerAllActions();
        Map splitAssignments5 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments5).containsOnlyKeys(new Integer[]{0, 1, 2, 3});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments5.get(3)).hasReceivedNoMoreSplitsSignal()).isTrue();
    }

    @Test
    public void testUnawareBucketEnumeratorAssignLater() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(4);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        testingSplitEnumeratorContext.registerReader(2, "test-host");
        testingSplitEnumeratorContext.registerReader(3, "test-host");
        TreeMap treeMap = new TreeMap();
        MockScan mockScan = new MockScan(treeMap);
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(mockScan).withBucketMode(BucketMode.UNAWARE).build();
        build.start();
        mockScan.allowEnd(false);
        build.handleSplitRequest(0, "test-host");
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments().size()).isEqualTo(0);
        build.handleSplitRequest(1, "test-host");
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments().size()).isEqualTo(0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(createDataSplit(0L, 0, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.triggerAllActions();
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits().size()).isEqualTo(1);
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits().size()).isEqualTo(1);
        build.handleSplitRequest(2, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0, 1, 2});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(2)).getAssignedSplits()).size()).isEqualTo(1);
        build.handleSplitRequest(3, "test-host");
        Map splitAssignments3 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments3).containsOnlyKeys(new Integer[]{0, 1, 2, 3});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments3.get(3)).getAssignedSplits()).size()).isEqualTo(1);
    }

    @Test
    public void testEnumeratorDeregisteredByContext() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(2);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        TreeMap treeMap = new TreeMap();
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(new MockScan(treeMap)).withBucketMode(BucketMode.UNAWARE).build();
        build.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(createDataSplit(0L, i, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.triggerAllActions();
        testingSplitEnumeratorContext.registeredReaders().remove(0);
        build.handleSplitRequest(0, "test-host");
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments().size()).isEqualTo(0);
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{1});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits()).size()).isEqualTo(1);
    }

    @Test
    public void testRemoveReadersAwaitSuccessful() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(2);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        TreeMap treeMap = new TreeMap();
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(new MockScan(treeMap)).withBucketMode(BucketMode.UNAWARE).build();
        build.start();
        build.handleSplitRequest(1, "test-host");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(createDataSplit(0L, i, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        testingSplitEnumeratorContext.registeredReaders().remove(1);
        Assertions.assertThatCode(() -> {
            build.handleSplitRequest(0, "test-host");
        }).doesNotThrowAnyException();
    }

    @Test
    public void testTriggerScanByTaskRequest() throws Exception {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(2);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        TreeMap treeMap = new TreeMap();
        MockScan mockScan = new MockScan(treeMap);
        mockScan.allowEnd(false);
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(mockScan).build();
        build.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(createDataSplit(0L, i, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        build.handleSplitRequest(0, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(0)});
        build.handleSplitRequest(1, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(1)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(1)});
    }

    @Test
    public void testNoTriggerWhenReadLatest() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(4);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        testingSplitEnumeratorContext.registerReader(2, "test-host");
        testingSplitEnumeratorContext.registerReader(3, "test-host");
        TreeMap treeMap = new TreeMap();
        MockScan mockScan = new MockScan(treeMap);
        mockScan.allowEnd(false);
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(mockScan).build();
        build.start();
        build.handleSplitRequest(0, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(createDataSplit(0L, i, Collections.emptyList()));
        }
        treeMap.put(1L, new DataFilePlan(arrayList));
        build.handleSplitRequest(0, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments()).isEmpty();
        build.handleSplitRequest(1, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments()).isEmpty();
        testingSplitEnumeratorContext.triggerAllActions();
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(0)});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(1)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(1)});
        arrayList.clear();
        for (int i2 = 2; i2 < 4; i2++) {
            arrayList.add(createDataSplit(0L, i2, Collections.emptyList()));
        }
        treeMap.put(2L, new DataFilePlan(arrayList));
        build.handleSplitRequest(2, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        build.handleSplitRequest(3, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0, 1, 2, 3});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(2)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(0)});
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(3)).getAssignedSplits())).containsExactly(new DataSplit[]{(DataSplit) arrayList.get(1)});
        build.handleSplitRequest(3, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        arrayList.clear();
        arrayList.add(createDataSplit(0L, 7, Collections.emptyList()));
        treeMap.put(3L, new DataFilePlan(arrayList));
        build.handleSplitRequest(3, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) testingSplitEnumeratorContext.getSplitAssignments().get(3)).getAssignedSplits())).doesNotContain(new DataSplit[]{(DataSplit) arrayList.get(0)});
        build.enableTriggerScan();
        build.handleSplitRequest(3, "test-host");
        testingSplitEnumeratorContext.getExecutorService().triggerAllNonPeriodicTasks();
        Assertions.assertThat(toDataSplits(((TestingSplitEnumeratorContext.SplitAssignmentState) testingSplitEnumeratorContext.getSplitAssignments().get(3)).getAssignedSplits())).contains(new DataSplit[]{(DataSplit) arrayList.get(0)});
    }

    @Test
    public void testEnumeratorWithCheckpoint() {
        SplitEnumeratorContext<FileStoreSourceSplit> testingAsyncSplitEnumeratorContext = new TestingAsyncSplitEnumeratorContext<>(1);
        testingAsyncSplitEnumeratorContext.registerReader(0, "test-host");
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap(4);
        MockScan mockScan = new MockScan(treeMap);
        for (int i = 1; i <= 4; i++) {
            List singletonList = Collections.singletonList(createDataSplit(i, 0, Collections.emptyList()));
            treeMap.put(Long.valueOf(i), new DataFilePlan(singletonList));
            hashMap.put(Long.valueOf(i), singletonList);
        }
        ContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingAsyncSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(1L).setScan(mockScan).build();
        build.start();
        AtomicReference atomicReference = new AtomicReference();
        testingAsyncSplitEnumeratorContext.runInCoordinatorThread(() -> {
            atomicReference.set(checkpointWithoutException(build, 1L));
        });
        testingAsyncSplitEnumeratorContext.triggerAlCoordinatorAction();
        PendingSplitsCheckpoint pendingSplitsCheckpoint = (PendingSplitsCheckpoint) atomicReference.getAndSet(null);
        Assertions.assertThat(pendingSplitsCheckpoint).isNotNull();
        Assertions.assertThat(pendingSplitsCheckpoint.currentSnapshotId()).isNull();
        Assertions.assertThat(pendingSplitsCheckpoint.splits()).isEmpty();
        testingAsyncSplitEnumeratorContext.triggerAllWorkerAction();
        testingAsyncSplitEnumeratorContext.triggerAlCoordinatorAction();
        testingAsyncSplitEnumeratorContext.runInCoordinatorThread(() -> {
            atomicReference.set(checkpointWithoutException(build, 2L));
        });
        testingAsyncSplitEnumeratorContext.triggerAlCoordinatorAction();
        PendingSplitsCheckpoint pendingSplitsCheckpoint2 = (PendingSplitsCheckpoint) atomicReference.getAndSet(null);
        Assertions.assertThat(pendingSplitsCheckpoint2).isNotNull();
        Assertions.assertThat(pendingSplitsCheckpoint2.currentSnapshotId()).isEqualTo(2L);
        Assertions.assertThat(toDataSplits(pendingSplitsCheckpoint2.splits())).containsExactlyElementsOf((Iterable) hashMap.get(1L));
        build.handleSplitRequest(0, "test");
        testingAsyncSplitEnumeratorContext.triggerAlCoordinatorAction();
        testingAsyncSplitEnumeratorContext.triggerAllWorkerAction();
        testingAsyncSplitEnumeratorContext.runInCoordinatorThread(() -> {
            atomicReference.set(checkpointWithoutException(build, 3L));
        });
        testingAsyncSplitEnumeratorContext.triggerAllWorkerAction();
        testingAsyncSplitEnumeratorContext.triggerNextCoordinatorAction();
        testingAsyncSplitEnumeratorContext.triggerNextCoordinatorAction();
        PendingSplitsCheckpoint pendingSplitsCheckpoint3 = (PendingSplitsCheckpoint) atomicReference.getAndSet(null);
        Assertions.assertThat(pendingSplitsCheckpoint3).isNotNull();
        Assertions.assertThat(pendingSplitsCheckpoint3.currentSnapshotId()).isEqualTo(3L);
        Assertions.assertThat(toDataSplits(pendingSplitsCheckpoint3.splits())).containsExactlyElementsOf((Iterable) hashMap.get(2L));
    }

    private static PendingSplitsCheckpoint checkpointWithoutException(ContinuousFileSplitEnumerator continuousFileSplitEnumerator, long j) {
        try {
            return continuousFileSplitEnumerator.snapshotState(j);
        } catch (Exception e) {
            return null;
        }
    }

    private static List<DataSplit> toDataSplits(Collection<FileStoreSourceSplit> collection) {
        return (List) collection.stream().map((v0) -> {
            return v0.split();
        }).map(split -> {
            return (DataSplit) split;
        }).collect(Collectors.toList());
    }

    public static FileStoreSourceSplit createSnapshotSplit(int i, int i2, List<DataFileMeta> list) {
        return new FileStoreSourceSplit(UUID.randomUUID().toString(), DataSplit.builder().withSnapshot(i).withPartition(MergeTreeCompactManagerTest.row(1)).withBucket(i2).withDataFiles(list).isStreaming(true).build(), 0L);
    }

    private static DataSplit createDataSplit(long j, int i, List<DataFileMeta> list) {
        return DataSplit.builder().withSnapshot(j).withPartition(MergeTreeCompactManagerTest.row(1)).withBucket(i).withDataFiles(list).isStreaming(true).build();
    }
}
