package org.apache.iceberg.flink.source;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.IcebergSource;
import org.awaitility.Awaitility;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.class */
public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIcebergSourceFailover {
    private static final long RECORD_BATCH_TS_INCREMENT_MILLI = TimeUnit.MINUTES.toMillis(15);
    private static final long RECORD_TS_INCREMENT_MILLI = TimeUnit.SECONDS.toMillis(1);
    private final AtomicLong tsMilli = new AtomicLong(System.currentTimeMillis());

    @Override // org.apache.iceberg.flink.source.TestIcebergSourceFailover
    protected IcebergSource.Builder<RowData> sourceBuilder() {
        return IcebergSource.builder().tableLoader(this.sourceTableResource.tableLoader()).watermarkColumn("ts").project(TestFixtures.TS_SCHEMA);
    }

    @Override // org.apache.iceberg.flink.source.TestIcebergSourceFailover
    protected Schema schema() {
        return TestFixtures.TS_SCHEMA;
    }

    @Override // org.apache.iceberg.flink.source.TestIcebergSourceFailover
    protected List<Record> generateRecords(int i, long j) {
        this.tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
        return (List) RandomGenericData.generate(schema(), i, j).stream().peek(record -> {
            record.setField("ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(this.tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)), ZoneId.of("Z")));
        }).collect(Collectors.toList());
    }

    @Override // org.apache.iceberg.flink.source.TestIcebergSourceFailover
    protected void assertRecords(Table table, List<Record> list, Duration duration) throws Exception {
        List<Record> convertLocalDateTimeToMilli = convertLocalDateTimeToMilli(list);
        Awaitility.await("expected list of records should be produced").atMost(duration).untilAsserted(() -> {
            SimpleDataUtil.equalsRecords(convertLocalDateTimeToMilli, convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)), table.schema());
            SimpleDataUtil.assertRecordsEqual(convertLocalDateTimeToMilli, convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)), table.schema());
        });
    }

    private List<Record> convertLocalDateTimeToMilli(List<Record> list) {
        return (List) list.stream().peek(record -> {
            record.setField("ts", Long.valueOf(((LocalDateTime) record.getField("ts")).atZone((ZoneId) ZoneOffset.UTC).toInstant().toEpochMilli()));
        }).collect(Collectors.toList());
    }
}
