package org.apache.iceberg.flink.source.assigner;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.class */
public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "timestamp_column", Types.TimestampType.withoutZone())});
    private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssignerTestBase
    protected SplitAssigner splitAssigner() {
        return new OrderedSplitAssignerFactory(SplitComparators.watermark(new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column", (TimeUnit) null))).createAssigner();
    }

    @Test
    public void testMultipleFilesInAnIcebergSplit() {
        SplitAssigner splitAssigner = splitAssigner();
        splitAssigner.onDiscoveredSplits(createSplits(4, 2, "2"));
        assertGetNext(splitAssigner, GetSplitResult.Status.AVAILABLE);
        assertGetNext(splitAssigner, GetSplitResult.Status.AVAILABLE);
        assertGetNext(splitAssigner, GetSplitResult.Status.UNAVAILABLE);
    }

    @Test
    public void testSplitSort() {
        SplitAssigner splitAssigner = splitAssigner();
        Instant now = Instant.now();
        List list = (List) IntStream.range(0, 5).mapToObj(i -> {
            return splitFromInstant(now.plus(i, (TemporalUnit) ChronoUnit.MINUTES));
        }).collect(Collectors.toList());
        splitAssigner.onDiscoveredSplits(list.subList(3, 5));
        splitAssigner.onDiscoveredSplits(list.subList(0, 1));
        splitAssigner.onDiscoveredSplits(list.subList(1, 3));
        assertGetNext(splitAssigner, (IcebergSourceSplit) list.get(0));
        assertGetNext(splitAssigner, (IcebergSourceSplit) list.get(1));
        assertGetNext(splitAssigner, (IcebergSourceSplit) list.get(2));
        assertGetNext(splitAssigner, (IcebergSourceSplit) list.get(3));
        assertGetNext(splitAssigner, (IcebergSourceSplit) list.get(4));
        assertGetNext(splitAssigner, GetSplitResult.Status.UNAVAILABLE);
    }

    @Test
    public void testSerializable() {
        Assert.assertNotNull((SerializableComparator) SerializationUtil.deserializeFromBytes(SerializationUtil.serializeToBytes(SplitComparators.watermark(new ColumnStatsWatermarkExtractor(TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS)))));
    }

    private void assertGetNext(SplitAssigner splitAssigner, IcebergSourceSplit icebergSourceSplit) {
        Assert.assertEquals(splitAssigner.getNext((String) null).split(), icebergSourceSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.flink.source.assigner.SplitAssignerTestBase
    public List<IcebergSourceSplit> createSplits(int i, int i2, String str) {
        return (List) IntStream.range(0, i / i2).mapToObj(i3 -> {
            return splitFromRecords((List) IntStream.range(0, i2).mapToObj(i3 -> {
                return RandomGenericData.generate(SCHEMA, 2, (i3 * i2) + i3);
            }).collect(Collectors.toList()));
        }).collect(Collectors.toList());
    }

    private IcebergSourceSplit splitFromInstant(Instant instant) {
        GenericRecord create = GenericRecord.create(SCHEMA);
        create.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
        return splitFromRecords(ImmutableList.of(ImmutableList.of(create)));
    }

    private IcebergSourceSplit splitFromRecords(List<List<Record>> list) {
        try {
            return IcebergSourceSplit.fromCombinedScanTask(ReaderUtil.createCombinedScanTask(list, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
        } catch (IOException e) {
            throw new RuntimeException("Split creation exception", e);
        }
    }
}
