package org.apache.iceberg.flink.source.enumerator;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.class */
public class TestContinuousIcebergEnumerator {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Test
    public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
        ManualContinuousSplitPlanner manualContinuousSplitPlanner = new ManualContinuousSplitPlanner();
        TestingSplitEnumeratorContext testingSplitEnumeratorContext = new TestingSplitEnumeratorContext(4);
        ContinuousIcebergEnumerator createEnumerator = createEnumerator(testingSplitEnumeratorContext, ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build(), manualContinuousSplitPlanner);
        Assert.assertEquals(0L, createEnumerator.snapshotState(1L).pendingSplits().size());
        List<IcebergSourceSplit> createSplitsFromTransientHadoopTable = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        manualContinuousSplitPlanner.addSplits(createSplitsFromTransientHadoopTable, IcebergEnumeratorPosition.of(1L, 1L));
        testingSplitEnumeratorContext.triggerAllActions();
        Collection pendingSplits = createEnumerator.snapshotState(2L).pendingSplits();
        Assert.assertEquals(1L, pendingSplits.size());
        IcebergSourceSplitState icebergSourceSplitState = (IcebergSourceSplitState) pendingSplits.iterator().next();
        Assert.assertEquals(createSplitsFromTransientHadoopTable.get(0).splitId(), icebergSourceSplitState.split().splitId());
        Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, icebergSourceSplitState.status());
    }

    @Test
    public void testDiscoverWhenReaderRegistered() throws Exception {
        ManualContinuousSplitPlanner manualContinuousSplitPlanner = new ManualContinuousSplitPlanner();
        TestingSplitEnumeratorContext testingSplitEnumeratorContext = new TestingSplitEnumeratorContext(4);
        ContinuousIcebergEnumerator createEnumerator = createEnumerator(testingSplitEnumeratorContext, ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build(), manualContinuousSplitPlanner);
        testingSplitEnumeratorContext.registerReader(2, "localhost");
        createEnumerator.addReader(2);
        createEnumerator.handleSourceEvent(2, new SplitRequestEvent());
        List<IcebergSourceSplit> createSplitsFromTransientHadoopTable = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        manualContinuousSplitPlanner.addSplits(createSplitsFromTransientHadoopTable, IcebergEnumeratorPosition.of(1L, 1L));
        testingSplitEnumeratorContext.triggerAllActions();
        Assert.assertTrue(createEnumerator.snapshotState(1L).pendingSplits().isEmpty());
        MatcherAssert.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) testingSplitEnumeratorContext.getSplitAssignments().get(2)).getAssignedSplits(), CoreMatchers.hasItem(createSplitsFromTransientHadoopTable.get(0)));
    }

    @Test
    public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
        ManualContinuousSplitPlanner manualContinuousSplitPlanner = new ManualContinuousSplitPlanner();
        TestingSplitEnumeratorContext testingSplitEnumeratorContext = new TestingSplitEnumeratorContext(4);
        ContinuousIcebergEnumerator createEnumerator = createEnumerator(testingSplitEnumeratorContext, ScanContext.builder().streaming(true).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build(), manualContinuousSplitPlanner);
        testingSplitEnumeratorContext.registerReader(2, "localhost");
        createEnumerator.addReader(2);
        createEnumerator.handleSourceEvent(2, new SplitRequestEvent());
        testingSplitEnumeratorContext.registeredReaders().remove(2);
        List<IcebergSourceSplit> createSplitsFromTransientHadoopTable = SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
        Assert.assertEquals(1L, createSplitsFromTransientHadoopTable.size());
        manualContinuousSplitPlanner.addSplits(createSplitsFromTransientHadoopTable, IcebergEnumeratorPosition.of(1L, 1L));
        testingSplitEnumeratorContext.triggerAllActions();
        Assert.assertFalse(testingSplitEnumeratorContext.getSplitAssignments().containsKey(2));
        List list = (List) createEnumerator.snapshotState(1L).pendingSplits().stream().map((v0) -> {
            return v0.split();
        }).map((v0) -> {
            return v0.splitId();
        }).collect(Collectors.toList());
        Assert.assertEquals(createSplitsFromTransientHadoopTable.size(), list.size());
        Assert.assertEquals(createSplitsFromTransientHadoopTable.get(0).splitId(), list.get(0));
        testingSplitEnumeratorContext.registerReader(2, "localhost");
        createEnumerator.addReader(2);
        createEnumerator.handleSourceEvent(2, new SplitRequestEvent());
        Assert.assertTrue(createEnumerator.snapshotState(2L).pendingSplits().isEmpty());
        MatcherAssert.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) testingSplitEnumeratorContext.getSplitAssignments().get(2)).getAssignedSplits(), CoreMatchers.hasItem(createSplitsFromTransientHadoopTable.get(0)));
    }

    private static ContinuousIcebergEnumerator createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> splitEnumeratorContext, ScanContext scanContext, ContinuousSplitPlanner continuousSplitPlanner) {
        ContinuousIcebergEnumerator continuousIcebergEnumerator = new ContinuousIcebergEnumerator(splitEnumeratorContext, new SimpleSplitAssigner(Collections.emptyList()), scanContext, continuousSplitPlanner, (IcebergEnumeratorState) null);
        continuousIcebergEnumerator.start();
        return continuousIcebergEnumerator;
    }
}
