package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceSql.class */
public class TestIcebergSourceSql extends TestSqlBase {
    private static final Schema SCHEMA_TS = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "t1", Types.TimestampType.withoutZone()), Types.NestedField.required(2, "t2", Types.LongType.get())});

    @Override // org.apache.iceberg.flink.source.TestSqlBase
    public void before() throws IOException {
        TableEnvironment tableEnv = getTableEnv();
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
        tableEnv.getConfig().set("table.exec.resource.default-parallelism", "1");
        SqlHelpers.sql(tableEnv, "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", this.catalogResource.warehouse());
        SqlHelpers.sql(tableEnv, "use catalog iceberg_catalog", new Object[0]);
        configuration.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
    }

    private Record generateRecord(Instant instant, long j) {
        GenericRecord create = GenericRecord.create(SCHEMA_TS);
        create.setField("t1", instant.atZone(ZoneId.systemDefault()).toLocalDateTime());
        create.setField("t2", Long.valueOf(j));
        return create;
    }

    private List<Record> generateExpectedRecords(boolean z) throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS), FileFormat.PARQUET, TEMPORARY_FOLDER);
        Record generateRecord = generateRecord(Instant.ofEpochMilli(1702382109000L), 1702382109000L + 2592000000L);
        Record generateRecord2 = generateRecord(Instant.ofEpochMilli(1702382109000L - 10000), 1702382109000L + 3024000000L);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(generateRecord);
        newArrayList.add(generateRecord2);
        DataFile writeFile = genericAppenderHelper.writeFile(newArrayList);
        Record generateRecord3 = generateRecord(Instant.ofEpochMilli(1702382109000L + 14000), 1702382109000L - 2592000000L);
        Record generateRecord4 = generateRecord(Instant.ofEpochMilli(1702382109000L + 12000), 1702382109000L - 3074400000L);
        ArrayList newArrayList2 = Lists.newArrayList();
        newArrayList2.add(generateRecord3);
        newArrayList2.add(generateRecord4);
        genericAppenderHelper.appendToTable(new DataFile[]{writeFile, genericAppenderHelper.writeFile(newArrayList2)});
        ArrayList newArrayList3 = Lists.newArrayList();
        if (z) {
            newArrayList3.addAll(newArrayList);
            newArrayList3.addAll(newArrayList2);
        } else {
            newArrayList3.addAll(newArrayList2);
            newArrayList3.addAll(newArrayList);
        }
        return newArrayList3;
    }

    @Test
    public void testWatermarkOptionsAscending() throws Exception {
        TestHelpers.assertRecordsWithOrder(run(ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", "128000000"), "", "*"), generateExpectedRecords(true), SCHEMA_TS);
    }

    @Test
    public void testWatermarkOptionsDescending() throws Exception {
        TestHelpers.assertRecordsWithOrder(run(ImmutableMap.of("watermark-column", "t2", "watermark-column-time-unit", "MILLISECONDS", "split-file-open-cost", "128000000"), "", "*"), generateExpectedRecords(false), SCHEMA_TS);
    }
}
