package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.class */
public class TestColumnStatsWatermarkExtractor {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "timestamp_column", Types.TimestampType.withoutZone()), Types.NestedField.required(2, "timestamptz_column", Types.TimestampType.withZone()), Types.NestedField.required(3, "long_column", Types.LongType.get()), Types.NestedField.required(4, "string_column", Types.StringType.get())});
    private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);
    private static final List<List<Record>> TEST_RECORDS = ImmutableList.of(RandomGenericData.generate(SCHEMA, 3, 2), RandomGenericData.generate(SCHEMA, 3, 19));
    private static final List<Map<String, Long>> MIN_VALUES = ImmutableList.of(Maps.newHashMapWithExpectedSize(3), Maps.newHashMapWithExpectedSize(3));

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, SCHEMA);
    private final String columnName;

    @BeforeClass
    public static void updateMinValue() {
        for (int i = 0; i < TEST_RECORDS.size(); i++) {
            for (Record record : TEST_RECORDS.get(i)) {
                Map<String, Long> map = MIN_VALUES.get(i);
                map.merge("timestamp_column", Long.valueOf(((LocalDateTime) record.get(0)).toInstant(ZoneOffset.UTC).toEpochMilli()), (v0, v1) -> {
                    return Math.min(v0, v1);
                });
                map.merge("timestamptz_column", Long.valueOf(((OffsetDateTime) record.get(1)).toInstant().toEpochMilli()), (v0, v1) -> {
                    return Math.min(v0, v1);
                });
                map.merge("long_column", (Long) record.get(2), (v0, v1) -> {
                    return Math.min(v0, v1);
                });
            }
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return ImmutableList.of(new Object[]{"timestamp_column"}, new Object[]{"timestamptz_column"}, new Object[]{"long_column"});
    }

    public TestColumnStatsWatermarkExtractor(String str) {
        this.columnName = str;
    }

    @Test
    public void testSingle() throws IOException {
        Assert.assertEquals(MIN_VALUES.get(0).get(this.columnName).longValue(), new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, TimeUnit.MILLISECONDS).extractWatermark(split(0)));
    }

    @Test
    public void testTimeUnit() throws IOException {
        Assume.assumeTrue("Run only for long column", this.columnName.equals("long_column"));
        Assert.assertEquals(MIN_VALUES.get(0).get(this.columnName).longValue() / 1000, new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, TimeUnit.MICROSECONDS).extractWatermark(split(0)));
    }

    @Test
    public void testMultipleFiles() throws IOException {
        Assume.assumeTrue("Run only for the timestamp column", this.columnName.equals("timestamp_column"));
        IcebergSourceSplit fromCombinedScanTask = IcebergSourceSplit.fromCombinedScanTask(ReaderUtil.createCombinedScanTask(TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
        ColumnStatsWatermarkExtractor columnStatsWatermarkExtractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, (TimeUnit) null);
        Assert.assertEquals(MIN_VALUES.get(0).get(this.columnName).longValue(), columnStatsWatermarkExtractor.extractWatermark(split(0)));
        Assert.assertEquals(MIN_VALUES.get(1).get(this.columnName).longValue(), columnStatsWatermarkExtractor.extractWatermark(split(1)));
        Assert.assertEquals(Math.min(MIN_VALUES.get(0).get(this.columnName).longValue(), MIN_VALUES.get(1).get(this.columnName).longValue()), columnStatsWatermarkExtractor.extractWatermark(fromCombinedScanTask));
    }

    @Test
    public void testWrongColumn() {
        Assume.assumeTrue("Run only for string column", this.columnName.equals("string_column"));
        Assertions.assertThatThrownBy(() -> {
            new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, (TimeUnit) null);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Found STRING, expected a LONG or TIMESTAMP column for watermark generation.");
    }

    @Test
    public void testEmptyStatistics() throws IOException {
        Assume.assumeTrue("Run only for timestamp column", this.columnName.equals("timestamp_column"));
        ColumnStatsWatermarkExtractor columnStatsWatermarkExtractor = new ColumnStatsWatermarkExtractor(10, "missing_field");
        Assertions.assertThatThrownBy(() -> {
            columnStatsWatermarkExtractor.extractWatermark(split(0));
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Missing statistics for column");
    }

    private IcebergSourceSplit split(int i) throws IOException {
        return IcebergSourceSplit.fromCombinedScanTask(ReaderUtil.createCombinedScanTask(ImmutableList.of(TEST_RECORDS.get(i)), TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
    }
}
