package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.class */
public abstract class ReaderFunctionTestBase<T> {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final FileFormat fileFormat;
    private final GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "fileFormat={0}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{FileFormat.AVRO}, new Object[]{FileFormat.ORC}, new Object[]{FileFormat.PARQUET}};
    }

    protected abstract ReaderFunction<T> readerFunction();

    protected abstract void assertRecords(List<Record> list, List<T> list2, Schema schema);

    public ReaderFunctionTestBase(FileFormat fileFormat) {
        this.fileFormat = fileFormat;
    }

    private List<List<Record>> createRecordBatchList(int i) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithCapacity.add(RandomGenericData.generate(TestFixtures.SCHEMA, 2, i2));
        }
        return newArrayListWithCapacity;
    }

    private CombinedScanTask createCombinedScanTask(List<List<Record>> list) throws IOException {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size());
        for (int i = 0; i < list.size(); i++) {
            newArrayListWithCapacity.add(ReaderUtil.createFileTask(list.get(i), TEMPORARY_FOLDER.newFile(), this.fileFormat, this.appenderFactory));
        }
        return new BaseCombinedScanTask(newArrayListWithCapacity);
    }

    private void assertRecordsAndPosition(List<Record> list, int i, long j, RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds) {
        recordsWithSplitIds.nextSplit();
        ArrayList newArrayList = Lists.newArrayList();
        long j2 = j;
        while (true) {
            long j3 = j2;
            RecordAndPosition recordAndPosition = (RecordAndPosition) recordsWithSplitIds.nextRecordFromSplit();
            if (recordAndPosition == null) {
                Assert.assertEquals("expected record count", list.size(), newArrayList.size());
                assertRecords(list, newArrayList, TestFixtures.SCHEMA);
                return;
            } else {
                newArrayList.add(recordAndPosition.record());
                Assert.assertEquals("expected file offset", i, recordAndPosition.fileOffset());
                Assert.assertEquals("expected record offset", j3, recordAndPosition.recordOffset() - 1);
                j2 = j3 + 1;
            }
        }
    }

    @Test
    public void testNoCheckpointedPosition() throws IOException {
        List<List<Record>> createRecordBatchList = createRecordBatchList(3);
        CloseableIterator closeableIterator = (CloseableIterator) readerFunction().apply(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask(createRecordBatchList)));
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(0), 0, 0L, recordsWithSplitIds);
        recordsWithSplitIds.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds2 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(1), 1, 0L, recordsWithSplitIds2);
        recordsWithSplitIds2.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds3 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(2), 2, 0L, recordsWithSplitIds3);
        recordsWithSplitIds3.recycle();
    }

    @Test
    public void testCheckpointedPositionBeforeFirstFile() throws IOException {
        List<List<Record>> createRecordBatchList = createRecordBatchList(3);
        CloseableIterator closeableIterator = (CloseableIterator) readerFunction().apply(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask(createRecordBatchList), 0, 0L));
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(0), 0, 0L, recordsWithSplitIds);
        recordsWithSplitIds.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds2 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(1), 1, 0L, recordsWithSplitIds2);
        recordsWithSplitIds2.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds3 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(2), 2, 0L, recordsWithSplitIds3);
        recordsWithSplitIds3.recycle();
    }

    @Test
    public void testCheckpointedPositionMiddleFirstFile() throws IOException {
        List<List<Record>> createRecordBatchList = createRecordBatchList(3);
        CloseableIterator closeableIterator = (CloseableIterator) readerFunction().apply(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask(createRecordBatchList), 0, 1L));
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(0).subList(1, 2), 0, 1L, recordsWithSplitIds);
        recordsWithSplitIds.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds2 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(1), 1, 0L, recordsWithSplitIds2);
        recordsWithSplitIds2.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds3 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(2), 2, 0L, recordsWithSplitIds3);
        recordsWithSplitIds3.recycle();
    }

    @Test
    public void testCheckpointedPositionAfterFirstFile() throws IOException {
        List<List<Record>> createRecordBatchList = createRecordBatchList(3);
        CloseableIterator closeableIterator = (CloseableIterator) readerFunction().apply(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask(createRecordBatchList), 0, 2L));
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(1), 1, 0L, recordsWithSplitIds);
        recordsWithSplitIds.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds2 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(2), 2, 0L, recordsWithSplitIds2);
        recordsWithSplitIds2.recycle();
    }

    @Test
    public void testCheckpointedPositionBeforeSecondFile() throws IOException {
        List<List<Record>> createRecordBatchList = createRecordBatchList(3);
        CloseableIterator closeableIterator = (CloseableIterator) readerFunction().apply(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask(createRecordBatchList), 1, 0L));
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(1), 1, 0L, recordsWithSplitIds);
        recordsWithSplitIds.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds2 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(2), 2, 0L, recordsWithSplitIds2);
        recordsWithSplitIds2.recycle();
    }

    @Test
    public void testCheckpointedPositionMidSecondFile() throws IOException {
        List<List<Record>> createRecordBatchList = createRecordBatchList(3);
        CloseableIterator closeableIterator = (CloseableIterator) readerFunction().apply(IcebergSourceSplit.fromCombinedScanTask(createCombinedScanTask(createRecordBatchList), 1, 1L));
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(1).subList(1, 2), 1, 1L, recordsWithSplitIds);
        recordsWithSplitIds.recycle();
        RecordsWithSplitIds<RecordAndPosition<T>> recordsWithSplitIds2 = (RecordsWithSplitIds) closeableIterator.next();
        assertRecordsAndPosition(createRecordBatchList.get(2), 2, 0L, recordsWithSplitIds2);
        recordsWithSplitIds2.recycle();
    }
}
