package org.apache.iceberg.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceBounded.class */
public class TestIcebergSourceBounded extends TestFlinkScan {
    public TestIcebergSourceBounded(String str) {
        super(str);
    }

    @Override // org.apache.iceberg.flink.source.TestFlinkScan
    protected List<Row> runWithProjection(String... strArr) throws Exception {
        Schema schema = this.catalogResource.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
        TableSchema.Builder builder = TableSchema.builder();
        TableSchema schema2 = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(schema));
        for (String str : strArr) {
            TableColumn tableColumn = (TableColumn) schema2.getTableColumn(str).get();
            builder.field(tableColumn.getName(), tableColumn.getType());
        }
        return run(FlinkSchemaUtil.convert(schema, builder.build()), Lists.newArrayList(), Maps.newHashMap(), "", strArr);
    }

    @Override // org.apache.iceberg.flink.source.TestFlinkScan
    protected List<Row> runWithFilter(Expression expression, String str) throws Exception {
        return run(null, Arrays.asList(expression), Maps.newHashMap(), str, "*");
    }

    @Override // org.apache.iceberg.flink.source.TestFlinkScan
    protected List<Row> runWithOptions(Map<String, String> map) throws Exception {
        return run(null, Lists.newArrayList(), map, "", "*");
    }

    @Override // org.apache.iceberg.flink.source.TestFlinkScan
    protected List<Row> run() throws Exception {
        return run(null, Lists.newArrayList(), Maps.newHashMap(), "", "*");
    }

    protected List<Row> run(Schema schema, List<Expression> list, Map<String, String> map, String str, String... strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        Configuration configuration = new Configuration();
        configuration.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        TableLoader tableLoader = tableLoader();
        try {
            tableLoader.open();
            Table loadTable = tableLoader.loadTable();
            if (tableLoader != null) {
                $closeResource(null, tableLoader);
            }
            IcebergSource.Builder flinkConfig = IcebergSource.forRowData().tableLoader(tableLoader()).assignerFactory(new SimpleSplitAssignerFactory()).flinkConfig(configuration);
            if (schema != null) {
                flinkConfig.project(schema);
            }
            flinkConfig.filters(list);
            flinkConfig.properties(map);
            CloseableIterator executeAndCollect = executionEnvironment.fromSource(flinkConfig.build(), WatermarkStrategy.noWatermarks(), "testBasicRead", TypeInformation.of(RowData.class)).map(new RowDataToRowMapper(FlinkSchemaUtil.convert(schema == null ? loadTable.schema() : schema))).executeAndCollect();
            try {
                ArrayList newArrayList = Lists.newArrayList(executeAndCollect);
                if (executeAndCollect != null) {
                    $closeResource(null, executeAndCollect);
                }
                return newArrayList;
            } catch (Throwable th) {
                if (executeAndCollect != null) {
                    $closeResource(null, executeAndCollect);
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (tableLoader != null) {
                $closeResource(null, tableLoader);
            }
            throw th2;
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
