package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestFlinkScanSql.class */
public class TestFlinkScanSql extends TestFlinkSource {
    private volatile TableEnvironment tEnv;

    public TestFlinkScanSql(String str) {
        super(str);
    }

    @Override // org.apache.iceberg.flink.source.TestFlinkScan
    public void before() throws IOException {
        super.before();
        sql("create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", this.warehouse);
        sql("use catalog iceberg_catalog", new Object[0]);
        getTableEnv().getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
    }

    private TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            synchronized (this) {
                if (this.tEnv == null) {
                    this.tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
                }
            }
        }
        return this.tEnv;
    }

    @Override // org.apache.iceberg.flink.source.TestFlinkSource
    protected List<Row> run(FlinkSource.Builder builder, Map<String, String> map, String str, String... strArr) {
        String join = String.join(",", strArr);
        StringBuilder sb = new StringBuilder();
        map.forEach((str2, str3) -> {
            sb.append(optionToKv(str2, str3)).append(",");
        });
        String sb2 = sb.toString();
        if (sb2.endsWith(",")) {
            sb2 = sb2.substring(0, sb2.length() - 1);
        }
        if (!sb2.isEmpty()) {
            sb2 = String.format("/*+ OPTIONS(%s)*/", sb2);
        }
        return sql("select %s from t %s %s", join, sb2, str);
    }

    @Test
    public void testResiduals() throws Exception {
        Table createTable = this.catalog.createTable(TableIdentifier.of(new String[]{TestFixtures.DATABASE, TestFixtures.TABLE}), TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
        ((Record) generate.get(0)).set(1, 123L);
        ((Record) generate.get(0)).set(2, "2020-03-20");
        ((Record) generate.get(1)).set(1, 456L);
        ((Record) generate.get(1)).set(2, "2020-03-20");
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(generate.get(0));
        genericAppenderHelper.appendToTable(new DataFile[]{genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate), genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-21", 0}), RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L))});
        org.apache.iceberg.flink.TestHelpers.assertRecords(runWithFilter(Expressions.and(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)), "where dt='2020-03-20' and id=123"), newArrayList, TestFixtures.SCHEMA);
    }

    @Test
    public void testInferedParallelism() throws IOException {
        Table createTable = this.catalog.createTable(TableIdentifier.of(new String[]{TestFixtures.DATABASE, TestFixtures.TABLE}), TestFixtures.SCHEMA, TestFixtures.SPEC);
        FlinkInputFormat buildFormat = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(createTable.location())).table(createTable).buildFormat();
        ScanContext build = ScanContext.builder().build();
        Assert.assertEquals("Should produce the expected parallelism.", 1L, FlinkSource.forRowData().inferParallelism(buildFormat, build));
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        DataFile writeFile = genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
        DataFile writeFile2 = genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-21", 0}), RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
        genericAppenderHelper.appendToTable(new DataFile[]{writeFile, writeFile2});
        sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", Long.valueOf(Math.max(writeFile.fileSizeInBytes(), writeFile2.fileSizeInBytes())));
        Assert.assertEquals("Should produce the expected parallelism.", 2L, FlinkSource.forRowData().inferParallelism(buildFormat, build));
        Assert.assertEquals("Should produce the expected parallelism.", 1L, FlinkSource.forRowData().inferParallelism(buildFormat, ScanContext.builder().limit(1L).build()));
        Configuration configuration = new Configuration();
        configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, 1);
        Assert.assertEquals("Should produce the expected parallelism.", 1L, FlinkSource.forRowData().flinkConf(configuration).inferParallelism(buildFormat, ScanContext.builder().build()));
        Assert.assertEquals("Should produce the expected parallelism.", 1L, FlinkSource.forRowData().flinkConf(configuration).inferParallelism(buildFormat, ScanContext.builder().limit(3L).build()));
        configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
        Assert.assertEquals("Should produce the expected parallelism.", 1L, FlinkSource.forRowData().flinkConf(configuration).inferParallelism(buildFormat, ScanContext.builder().limit(3L).build()));
    }

    @Test
    public void testInferParallelismWithGlobalSetting() throws IOException {
        this.tEnv.getConfig().getConfiguration().set(PipelineOptions.MAX_PARALLELISM, 1);
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.catalog.createTable(TableIdentifier.of(new String[]{TestFixtures.DATABASE, TestFixtures.TABLE}), TestFixtures.SCHEMA, (PartitionSpec) null), this.fileFormat, TEMPORARY_FOLDER);
        ArrayList newArrayList = Lists.newArrayList();
        long j = 0;
        for (int i = 0; i < 5; i++) {
            List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i);
            DataFile writeFile = genericAppenderHelper.writeFile((StructLike) null, generate);
            genericAppenderHelper.appendToTable(new DataFile[]{writeFile});
            newArrayList.addAll(generate);
            j = Math.max(writeFile.fileSizeInBytes(), j);
        }
        sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", Long.valueOf(j));
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(null, Maps.newHashMap(), "", "*"), newArrayList, TestFixtures.SCHEMA);
    }

    @Test
    public void testExposeLocality() throws Exception {
        Table createTable = this.catalog.createTable(TableIdentifier.of(new String[]{TestFixtures.DATABASE, TestFixtures.TABLE}), TestFixtures.SCHEMA, TestFixtures.SPEC);
        TableLoader fromHadoopTable = TableLoader.fromHadoopTable(createTable.location());
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
        generate.forEach(record -> {
            record.set(2, "2020-03-20");
        });
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(new DataFile[]{genericAppenderHelper.writeFile(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate)});
        Configuration configuration = getTableEnv().getConfig().getConfiguration();
        configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);
        org.apache.iceberg.flink.TestHelpers.assertRecords(sql("select * from t", new Object[0]), generate, TestFixtures.SCHEMA);
        configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), true);
        FlinkSource.Builder table = FlinkSource.forRowData().tableLoader(fromHadoopTable).table(createTable);
        Assert.assertFalse("Expose split locality info should be false.", ((Boolean) DynMethods.builder("localityEnabled").hiddenImpl(table.getClass(), new Class[0]).build().invoke(table, new Object[0])).booleanValue());
        org.apache.iceberg.flink.TestHelpers.assertRecords(run(table, Maps.newHashMap(), "where dt='2020-03-20'", "*"), generate, TestFixtures.SCHEMA);
    }

    private List<Row> sql(String str, Object... objArr) {
        try {
            CloseableIterator collect = getTableEnv().executeSql(String.format(str, objArr)).collect();
            Throwable th = null;
            try {
                try {
                    ArrayList newArrayList = Lists.newArrayList(collect);
                    if (collect != null) {
                        if (0 != 0) {
                            try {
                                collect.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            collect.close();
                        }
                    }
                    return newArrayList;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to collect table result", e);
        }
    }

    private String optionToKv(String str, Object obj) {
        return "'" + str + "'='" + obj + "'";
    }
}
