package org.apache.paimon.flink.source;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkTestBase;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/source/FileStoreSourceReaderTest.class */
public class FileStoreSourceReaderTest {

    @TempDir
    protected Path tempDir;

    /* loaded from: input_file:org/apache/paimon/flink/source/FileStoreSourceReaderTest$DummyMetricGroup.class */
    public static class DummyMetricGroup implements MetricGroup {
        public Counter counter(String str) {
            return null;
        }

        public <C extends Counter> C counter(String str, C c) {
            return null;
        }

        public <T, G extends Gauge<T>> G gauge(String str, G g) {
            return null;
        }

        public <H extends Histogram> H histogram(String str, H h) {
            return null;
        }

        public <M extends Meter> M meter(String str, M m) {
            return null;
        }

        public MetricGroup addGroup(String str) {
            return null;
        }

        public MetricGroup addGroup(String str, String str2) {
            return null;
        }

        public String[] getScopeComponents() {
            return new String[0];
        }

        public Map<String, String> getAllVariables() {
            return null;
        }

        public String getMetricIdentifier(String str) {
            return null;
        }

        public String getMetricIdentifier(String str, CharacterFilter characterFilter) {
            return null;
        }
    }

    @BeforeEach
    public void beforeEach() throws Exception {
        new SchemaManager(LocalFileIO.create(), new org.apache.paimon.fs.Path(this.tempDir.toUri())).createTable(new Schema(new RowType(Arrays.asList(new DataField(0, "k", new BigIntType()), new DataField(1, "v", new BigIntType()), new DataField(2, FlinkTestBase.CURRENT_DATABASE, new IntType()))).getFields(), Collections.singletonList(FlinkTestBase.CURRENT_DATABASE), Arrays.asList("k", FlinkTestBase.CURRENT_DATABASE), Collections.emptyMap(), (String) null));
    }

    @Test
    public void testRequestSplitWhenNoSplitRestored() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        FileStoreSourceReader createReader = createReader(testingReaderContext);
        createReader.start();
        createReader.close();
        Assertions.assertThat(testingReaderContext.getNumSplitRequests()).isEqualTo(1);
    }

    @Test
    public void testNoSplitRequestWhenSplitRestored() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        FileStoreSourceReader createReader = createReader(testingReaderContext);
        createReader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
        createReader.start();
        createReader.close();
        Assertions.assertThat(testingReaderContext.getNumSplitRequests()).isEqualTo(0);
    }

    @Test
    public void testAddMultipleSplits() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        FileStoreSourceReader createReader = createReader(testingReaderContext);
        createReader.start();
        Assertions.assertThat(testingReaderContext.getNumSplitRequests()).isEqualTo(1);
        createReader.addSplits(Arrays.asList(createTestFileSplit("id1"), createTestFileSplit("id2")));
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        while (createReader.getNumberOfCurrentlyAssignedSplits() > 0) {
            createReader.pollNext(testingReaderOutput);
            Thread.sleep(10L);
        }
        Assertions.assertThat(testingReaderContext.getNumSplitRequests()).isEqualTo(2);
    }

    @Test
    public void testReaderOnSplitFinished() throws Exception {
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        FileStoreSourceReader createReader = createReader(testingReaderContext);
        createReader.start();
        createReader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        while (createReader.getNumberOfCurrentlyAssignedSplits() > 0) {
            createReader.pollNext(testingReaderOutput);
            Thread.sleep(10L);
        }
        List sentEvents = testingReaderContext.getSentEvents();
        Assertions.assertThat(sentEvents.size()).isEqualTo(1);
        Assertions.assertThat(sentEvents.get(0)).isExactlyInstanceOf(ReaderConsumeProgressEvent.class);
        Assertions.assertThat((ReaderConsumeProgressEvent) sentEvents.get(0)).matches(readerConsumeProgressEvent -> {
            return readerConsumeProgressEvent.lastConsumeSnapshotId() == 1;
        });
    }

    protected FileStoreSourceReader createReader(TestingReaderContext testingReaderContext) {
        return new FileStoreSourceReader(testingReaderContext, new TestChangelogDataReadWrite(this.tempDir.toString()).createReadWithKey(), new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(this.tempDir.toString()), (Long) null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static FileStoreSourceSplit createTestFileSplit(String str) {
        return FileStoreSourceSplitSerializerTest.newSourceSplit(str, MergeTreeCompactManagerTest.row(1), 0, Collections.emptyList());
    }
}
