package org.apache.iceberg.flink.source;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/TestFlinkScan.class */
public abstract class TestFlinkScan {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
    protected final FileFormat fileFormat;

    @Parameterized.Parameters(name = "format={0}")
    public static Object[] parameters() {
        return new Object[]{"avro", "parquet", "orc"};
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TestFlinkScan(String str) {
        this.fileFormat = FileFormat.fromString(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TableLoader tableLoader() {
        return this.catalogResource.tableLoader();
    }

    protected abstract List<Row> runWithProjection(String... strArr) throws Exception;

    protected abstract List<Row> runWithFilter(Expression expression, String str, boolean z) throws Exception;

    protected List<Row> runWithFilter(Expression expression, String str) throws Exception {
        return runWithFilter(expression, str, true);
    }

    protected abstract List<Row> runWithOptions(Map<String, String> map) throws Exception;

    protected abstract List<Row> run() throws Exception;

    @Test
    public void testUnpartitionedTable() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(generate);
        TestHelpers.assertRecords(run(), generate, TestFixtures.SCHEMA);
    }

    @Test
    public void testPartitionedTable() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        ((Record) generate.get(0)).set(2, "2020-03-20");
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate);
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(), generate, TestFixtures.SCHEMA);
    }

    @Test
    public void testProjection() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate);
        assertRows(runWithProjection("data"), Row.of(new Object[]{((Record) generate.get(0)).get(0)}));
    }

    @Test
    public void testIdentityPartitionProjections() throws Exception {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "dt", Types.StringType.get()), Types.NestedField.optional(3, "level", Types.StringType.get()), Types.NestedField.optional(4, "message", Types.StringType.get())});
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema, PartitionSpec.builderFor(schema).identity("dt").identity("level").build());
        List<Record> generate = RandomGenericData.generate(schema, 10, 0L);
        int i = 0;
        AppendFiles newAppend = createTable.newAppend();
        for (Record record : generate) {
            record.set(1, "2020-03-2" + i);
            record.set(2, Integer.toString(i));
            newAppend.appendFile(new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).writeFile(TestHelpers.Row.of(new Object[]{"2020-03-2" + i, Integer.toString(i)}), ImmutableList.of(record)));
            i++;
        }
        newAppend.commit();
        validateIdentityPartitionProjections(createTable, Collections.singletonList("dt"), generate);
        validateIdentityPartitionProjections(createTable, Collections.singletonList("level"), generate);
        validateIdentityPartitionProjections(createTable, Collections.singletonList("message"), generate);
        validateIdentityPartitionProjections(createTable, Collections.singletonList("id"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("dt", "message"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("level", "message"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("dt", "level"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("message", "dt"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("message", "level"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("level", "dt"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("dt", "level", "message"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("level", "dt", "message"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("dt", "message", "level"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("level", "message", "dt"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("message", "dt", "level"), generate);
        validateIdentityPartitionProjections(createTable, Arrays.asList("message", "level", "dt"), generate);
    }

    private void validateIdentityPartitionProjections(Table table, List<String> list, List<Record> list2) throws Exception {
        List<Row> runWithProjection = runWithProjection((String[]) list.toArray(new String[0]));
        for (int i = 0; i < list2.size(); i++) {
            Record record = list2.get(i);
            Row row = runWithProjection.get(i);
            for (int i2 = 0; i2 < list.size(); i2++) {
                String str = list.get(i2);
                Assert.assertEquals("Projected field " + str + " should match", record.getField(str), row.getField(i2));
            }
        }
    }

    @Test
    public void testSnapshotReads() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        genericAppenderHelper.appendToTable(generate);
        long snapshotId = createTable.currentSnapshot().snapshotId();
        long timestampMillis = createTable.currentSnapshot().timestampMillis();
        waitUntilAfter(timestampMillis);
        genericAppenderHelper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.of("snapshot-id", Long.toString(snapshotId))), generate, TestFixtures.SCHEMA);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.of("as-of-timestamp", Long.toString(timestampMillis))), generate, TestFixtures.SCHEMA);
    }

    @Test
    public void testTagReads() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        genericAppenderHelper.appendToTable(generate);
        createTable.manageSnapshots().createTag("t1", createTable.currentSnapshot().snapshotId()).commit();
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.of("tag", "t1")), generate, TestFixtures.SCHEMA);
        List generate2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        genericAppenderHelper.appendToTable(generate2);
        createTable.manageSnapshots().replaceTag("t1", createTable.currentSnapshot().snapshotId()).commit();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(generate);
        newArrayList.addAll(generate2);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.of("tag", "t1")), newArrayList, TestFixtures.SCHEMA);
    }

    @Test
    public void testBranchReads() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        genericAppenderHelper.appendToTable(generate);
        createTable.manageSnapshots().createBranch("b1", createTable.currentSnapshot().snapshotId()).commit();
        List generate2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        genericAppenderHelper.appendToTable("b1", generate2);
        List generate3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        genericAppenderHelper.appendToTable(generate3);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(generate);
        newArrayList.addAll(generate2);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.of("branch", "b1")), newArrayList, TestFixtures.SCHEMA);
        ArrayList newArrayList2 = Lists.newArrayList();
        newArrayList2.addAll(generate);
        newArrayList2.addAll(generate3);
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(), newArrayList2, TestFixtures.SCHEMA);
    }

    @Test
    public void testIncrementalReadViaTag() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
        long snapshotId = createTable.currentSnapshot().snapshotId();
        String str = "t1";
        createTable.manageSnapshots().createTag("t1", snapshotId).commit();
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L);
        genericAppenderHelper.appendToTable(generate);
        List generate2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L);
        genericAppenderHelper.appendToTable(generate2);
        long snapshotId2 = createTable.currentSnapshot().snapshotId();
        String str2 = "t2";
        createTable.manageSnapshots().createTag("t2", snapshotId2).commit();
        genericAppenderHelper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(generate);
        newArrayList.addAll(generate2);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.builder().put("start-tag", "t1").put("end-tag", "t2").buildOrThrow()), newArrayList, TestFixtures.SCHEMA);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.builder().put("start-snapshot-id", Long.toString(snapshotId)).put("end-tag", "t2").buildOrThrow()), newArrayList, TestFixtures.SCHEMA);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.builder().put("start-tag", "t1").put("end-snapshot-id", Long.toString(snapshotId2)).buildOrThrow()), newArrayList, TestFixtures.SCHEMA);
        AssertHelpers.assertThrows("START_SNAPSHOT_ID and START_TAG cannot both be set.", Exception.class, () -> {
            return runWithOptions(ImmutableMap.builder().put("start-tag", str).put("end-tag", str2).put("start-snapshot-id", Long.toString(snapshotId)).buildOrThrow());
        });
        AssertHelpers.assertThrows("END_SNAPSHOT_ID and END_TAG cannot both be set.", Exception.class, () -> {
            return runWithOptions(ImmutableMap.builder().put("start-tag", str).put("end-tag", str2).put("end-snapshot-id", Long.toString(snapshotId2)).buildOrThrow());
        });
    }

    @Test
    public void testIncrementalRead() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
        long snapshotId = createTable.currentSnapshot().snapshotId();
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L);
        genericAppenderHelper.appendToTable(generate);
        List generate2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L);
        genericAppenderHelper.appendToTable(generate2);
        long snapshotId2 = createTable.currentSnapshot().snapshotId();
        genericAppenderHelper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L));
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(generate);
        newArrayList.addAll(generate2);
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithOptions(ImmutableMap.builder().put("start-snapshot-id", Long.toString(snapshotId)).put("end-snapshot-id", Long.toString(snapshotId2)).buildOrThrow()), newArrayList, TestFixtures.SCHEMA);
    }

    @Test
    public void testFilterExpPartition() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
        ((Record) generate.get(0)).set(2, "2020-03-20");
        ((Record) generate.get(1)).set(2, "2020-03-20");
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(new DataFile[]{genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate), genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-21", 0}), RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L))});
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'", true), generate, TestFixtures.SCHEMA);
    }

    private void testFilterExp(Expression expression, String str, boolean z) throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
        ((Record) generate.get(0)).set(0, "a");
        ((Record) generate.get(1)).set(0, "b");
        ((Record) generate.get(2)).set(0, "c");
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(new DataFile[]{genericAppenderHelper.writeFile(generate)});
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true), generate.subList(1, 3), TestFixtures.SCHEMA);
    }

    @Test
    public void testFilterExp() throws Exception {
        testFilterExp(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
    }

    @Test
    public void testFilterExpCaseInsensitive() throws Exception {
        testFilterExp(Expressions.greaterThanOrEqual("DATA", "b"), "where data>='b'", false);
    }

    @Test
    public void testPartitionTypes() throws Exception {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "decimal", Types.DecimalType.of(38, 18)), Types.NestedField.optional(3, "str", Types.StringType.get()), Types.NestedField.optional(4, "binary", Types.BinaryType.get()), Types.NestedField.optional(5, "date", Types.DateType.get()), Types.NestedField.optional(6, "time", Types.TimeType.get()), Types.NestedField.optional(7, "timestamp", Types.TimestampType.withoutZone())});
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema, PartitionSpec.builderFor(schema).identity("decimal").identity("str").identity("binary").identity("date").identity("time").identity("timestamp").build());
        List<Record> generate = RandomGenericData.generate(schema, 10, 0L);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        for (Record record : generate) {
            Object[] objArr = new Object[6];
            objArr[0] = record.get(1);
            objArr[1] = record.get(2);
            objArr[2] = record.get(3);
            objArr[3] = record.get(4) == null ? null : Integer.valueOf(DateTimeUtil.daysFromDate((LocalDate) record.get(4)));
            objArr[4] = record.get(5) == null ? null : Long.valueOf(DateTimeUtil.microsFromTime((LocalTime) record.get(5)));
            objArr[5] = record.get(6) == null ? null : Long.valueOf(DateTimeUtil.microsFromTimestamp((LocalDateTime) record.get(6)));
            genericAppenderHelper.appendToTable(TestHelpers.Row.of(objArr), Collections.singletonList(record));
        }
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(), generate, schema);
    }

    @Test
    public void testCustomizedFlinkDataTypes() throws Exception {
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "map", Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.StringType.get())), Types.NestedField.required(4, "arr", Types.ListType.ofRequired(5, Types.StringType.get()))});
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, schema);
        List generate = RandomGenericData.generate(schema, 10, 0L);
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(generate);
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(), generate, schema);
    }

    private static void assertRows(List<Row> list, Row... rowArr) {
        org.apache.iceberg.flink.TestHelpers.assertRows(list, Arrays.asList(rowArr));
    }

    private static void waitUntilAfter(long j) {
        for (long currentTimeMillis = System.currentTimeMillis(); currentTimeMillis <= j; currentTimeMillis = System.currentTimeMillis()) {
        }
    }
}
