package org.apache.iceberg.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.class */
public class TestIcebergSourceBoundedGenericRecord {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
    private final FileFormat fileFormat;
    private final int parallelism;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "format={0}, parallelism = {1}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"avro", 2}, new Object[]{"parquet", 2}, new Object[]{"orc", 2}};
    }

    public TestIcebergSourceBoundedGenericRecord(String str, int i) {
        this.fileFormat = FileFormat.valueOf(str.toUpperCase(Locale.ENGLISH));
        this.parallelism = i;
    }

    @Test
    public void testUnpartitionedTable() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(generate);
        TestHelpers.assertRecords(run(), generate, TestFixtures.SCHEMA);
    }

    @Test
    public void testPartitionedTable() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
        for (int i = 0; i < generate.size(); i++) {
            ((Record) generate.get(i)).setField("dt", "2020-03-20");
        }
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate);
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(), generate, TestFixtures.SCHEMA);
    }

    @Test
    public void testProjection() throws Exception {
        Table createTable = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate);
        org.apache.iceberg.flink.TestHelpers.assertRows(run(TypeUtil.select(TestFixtures.SCHEMA, Sets.newHashSet(new Integer[]{1})), Collections.emptyList(), Collections.emptyMap()), Arrays.asList(Row.of(new Object[]{((Record) generate.get(0)).get(0)}), Row.of(new Object[]{((Record) generate.get(1)).get(0)})));
    }

    private List<Row> run() throws Exception {
        return run(null, Collections.emptyList(), Collections.emptyMap());
    }

    private List<Row> run(Schema schema, List<Expression> list, Map<String, String> map) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(this.parallelism);
        executionEnvironment.getConfig().enableObjectReuse();
        Configuration configuration = new Configuration();
        configuration.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        TableLoader tableLoader = this.catalogResource.tableLoader();
        try {
            tableLoader.open();
            Table loadTable = tableLoader.loadTable();
            if (tableLoader != null) {
                $closeResource(null, tableLoader);
            }
            IcebergSource.Builder flinkConfig = IcebergSource.builder().tableLoader(this.catalogResource.tableLoader()).readerFunction(new AvroGenericRecordReaderFunction(TestFixtures.TABLE_IDENTIFIER.name(), new Configuration(), loadTable.schema(), (Schema) null, (String) null, false, loadTable.io(), loadTable.encryption())).assignerFactory(new SimpleSplitAssignerFactory()).flinkConfig(configuration);
            if (schema != null) {
                flinkConfig.project(schema);
            }
            flinkConfig.filters(list);
            flinkConfig.setAll(map);
            Schema schema2 = schema != null ? schema : loadTable.schema();
            RowType convert = FlinkSchemaUtil.convert(schema2);
            org.apache.avro.Schema convert2 = AvroSchemaUtil.convert(schema2, TestFixtures.TABLE_IDENTIFIER.name());
            CloseableIterator executeAndCollect = executionEnvironment.fromSource(flinkConfig.build(), WatermarkStrategy.noWatermarks(), "testBasicRead", new GenericRecordAvroTypeInfo(convert2)).map(AvroGenericRecordToRowDataMapper.forAvroSchema(convert2)).map(new RowDataToRowMapper(convert)).executeAndCollect();
            Throwable th = null;
            try {
                try {
                    ArrayList newArrayList = Lists.newArrayList(executeAndCollect);
                    if (executeAndCollect != null) {
                        $closeResource(null, executeAndCollect);
                    }
                    return newArrayList;
                } finally {
                }
            } catch (Throwable th2) {
                if (executeAndCollect != null) {
                    $closeResource(th, executeAndCollect);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (tableLoader != null) {
                $closeResource(null, tableLoader);
            }
            throw th3;
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
