package org.apache.iceberg.flink.source.reader;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.class */
public class TestIcebergSourceReader {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/flink/source/reader/TestIcebergSourceReader$IdBasedComparator.class */
    public static class IdBasedComparator implements SerializableComparator<IcebergSourceSplit> {
        private IdBasedComparator() {
        }

        public int compare(IcebergSourceSplit icebergSourceSplit, IcebergSourceSplit icebergSourceSplit2) {
            return icebergSourceSplit.splitId().compareTo(icebergSourceSplit2.splitId());
        }
    }

    @Test
    public void testReaderMetrics() throws Exception {
        TestingReaderOutput<RowData> testingReaderOutput = new TestingReaderOutput<>();
        TestingMetricGroup testingMetricGroup = new TestingMetricGroup();
        IcebergSourceReader createReader = createReader(testingMetricGroup, new TestingReaderContext(new Configuration(), testingMetricGroup), null);
        createReader.start();
        testOneSplitFetcher(createReader, testingReaderOutput, testingMetricGroup, 1);
        testOneSplitFetcher(createReader, testingReaderOutput, testingMetricGroup, 2);
    }

    @Test
    public void testReaderOrder() throws Exception {
        CombinedScanTask createCombinedScanTask = ReaderUtil.createCombinedScanTask(ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1), TEMPORARY_FOLDER, FileFormat.PARQUET, this.appenderFactory);
        CombinedScanTask createCombinedScanTask2 = ReaderUtil.createCombinedScanTask(ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1), TEMPORARY_FOLDER, FileFormat.PARQUET, this.appenderFactory);
        List<RowData> read = read(Arrays.asList(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask), IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask2)), 2L);
        List<RowData> read2 = read(Arrays.asList(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask2), IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask)), 2L);
        Assert.assertEquals(read.get(0), read2.get(0));
        Assert.assertEquals(read.get(1), read2.get(1));
    }

    private List<RowData> read(List<IcebergSourceSplit> list, long j) throws Exception {
        TestingMetricGroup testingMetricGroup = new TestingMetricGroup();
        IcebergSourceReader createReader = createReader(testingMetricGroup, new TestingReaderContext(new Configuration(), testingMetricGroup), new IdBasedComparator());
        createReader.start();
        createReader.addSplits(list);
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        while (testingReaderOutput.getEmittedRecords().size() < j) {
            createReader.pollNext(testingReaderOutput);
        }
        createReader.pollNext(testingReaderOutput);
        Assert.assertEquals(j, testingReaderOutput.getEmittedRecords().size());
        return testingReaderOutput.getEmittedRecords();
    }

    private void testOneSplitFetcher(IcebergSourceReader icebergSourceReader, TestingReaderOutput<RowData> testingReaderOutput, TestingMetricGroup testingMetricGroup, int i) throws Exception {
        List<List<Record>> createRecordBatchList = ReaderUtil.createRecordBatchList(i, TestFixtures.SCHEMA, 1, 1);
        icebergSourceReader.addSplits(Arrays.asList(IcebergSourceSplit.fromCombinedScanTask(ReaderUtil.createCombinedScanTask(createRecordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, this.appenderFactory))));
        while (testingReaderOutput.getEmittedRecords().size() < i) {
            icebergSourceReader.pollNext(testingReaderOutput);
        }
        Assert.assertEquals(i, testingReaderOutput.getEmittedRecords().size());
        TestHelpers.assertRowData(TestFixtures.SCHEMA, createRecordBatchList.get(0).get(0), (RowData) testingReaderOutput.getEmittedRecords().get(i - 1));
        Assert.assertEquals(i, testingMetricGroup.counters().get("assignedSplits").getCount());
        icebergSourceReader.pollNext(testingReaderOutput);
    }

    private IcebergSourceReader createReader(MetricGroup metricGroup, SourceReaderContext sourceReaderContext, SerializableComparator<IcebergSourceSplit> serializableComparator) {
        return new IcebergSourceReader(SerializableRecordEmitter.defaultEmitter(), new IcebergSourceReaderMetrics(metricGroup, "db.tbl"), new RowDataReaderFunction(new Configuration(), TestFixtures.SCHEMA, TestFixtures.SCHEMA, (String) null, true, new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), new PlaintextEncryptionManager(), Collections.emptyList()), serializableComparator, sourceReaderContext);
    }
}
