package org.apache.iceberg.flink.source;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.awaitility.Awaitility;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.class */
public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
    private static final int PARALLELISM = 4;
    private static final String SOURCE_NAME = "IcebergSource";
    private static final int RECORD_NUM_FOR_2_SPLITS = 200;
    private static final ConcurrentMap<Long, Integer> windows = Maps.newConcurrentMap();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).setConfiguration(this.reporter.addToConfiguration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)).withHaLeadershipControl().build());

    @Rule
    public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.TS_SCHEMA);

    /* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor$RowDataTimestampAssigner.class */
    private static class RowDataTimestampAssigner implements SerializableTimestampAssigner<RowData> {
        private RowDataTimestampAssigner() {
        }

        public long extractTimestamp(RowData rowData, long j) {
            return rowData.getTimestamp(0, 0).getMillisecond();
        }
    }

    @Test
    public void testWindowing() throws Exception {
        GenericAppenderHelper appender = appender();
        ArrayList newArrayList = Lists.newArrayList();
        ImmutableList of = ImmutableList.of(generateRecord(100, "file_1-recordTs_100"), generateRecord(101, "file_1-recordTs_101"), generateRecord(103, "file_1-recordTs_103"));
        newArrayList.addAll(of);
        appender.appendToTable(of);
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(100);
        for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; i++) {
            newArrayListWithCapacity.add(generateRecord(PARALLELISM - (i % 5), "file_2-recordTs_" + i));
        }
        newArrayList.addAll(newArrayListWithCapacity);
        appender.appendToTable(newArrayListWithCapacity);
        ImmutableList of2 = ImmutableList.of(generateRecord(6, "file_3-recordTs_6"), generateRecord(7, "file_3-recordTs_7"));
        newArrayList.addAll(of2);
        appender.appendToTable(of2);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromSource(source(), WatermarkStrategy.noWatermarks().withTimestampAssigner(new RowDataTimestampAssigner()), SOURCE_NAME, TypeInformation.of(RowData.class)).windowAll(TumblingEventTimeWindows.of(Time.minutes(5L))).apply(new AllWindowFunction<RowData, RowData, TimeWindow>() { // from class: org.apache.iceberg.flink.source.TestIcebergSourceWithWatermarkExtractor.1
            public void apply(TimeWindow timeWindow, Iterable<RowData> iterable, Collector<RowData> collector) {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                iterable.forEach(rowData -> {
                    atomicInteger.incrementAndGet();
                });
                collector.collect(TestIcebergSourceWithWatermarkExtractor.row(timeWindow.getStart(), atomicInteger.get()));
                TestIcebergSourceWithWatermarkExtractor.windows.put(Long.valueOf(timeWindow.getStart()), Integer.valueOf(atomicInteger.get()));
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<RowData>) iterable, (Collector<RowData>) collector);
            }
        });
        windows.clear();
        executionEnvironment.executeAsync("Iceberg Source Windowing Test");
        Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(windows.equals(ImmutableMap.of(0L, Integer.valueOf(RECORD_NUM_FOR_2_SPLITS), Long.valueOf(TimeUnit.MINUTES.toMillis(5L)), 2)));
        });
        appender.appendToTable(new DataFile[]{appender.writeFile(ImmutableList.of(generateRecord(1500, "last-record")))});
        Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(windows.equals(ImmutableMap.of(0L, Integer.valueOf(RECORD_NUM_FOR_2_SPLITS), Long.valueOf(TimeUnit.MINUTES.toMillis(5L)), 2, Long.valueOf(TimeUnit.MINUTES.toMillis(100L)), 3)));
        });
    }

    @Test
    public void testThrottling() throws Exception {
        GenericAppenderHelper appender = appender();
        ImmutableList of = ImmutableList.of(generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103"));
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(100);
        for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; i++) {
            newArrayListWithCapacity.add(generateRecord(PARALLELISM - (i % 5), "file_2-recordTs_" + i));
        }
        ImmutableList of2 = ImmutableList.of(generateRecord(15, "file_3-recordTs_15"), generateRecord(16, "file_3-recordTs_16"), generateRecord(17, "file_3-recordTs_17"));
        ImmutableList of3 = ImmutableList.of(generateRecord(15, "file_4-recordTs_15"), generateRecord(16, "file_4-recordTs_16"), generateRecord(17, "file_4-recordTs_17"));
        ImmutableList of4 = ImmutableList.of(generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        CloseableIterator collectAsync = executionEnvironment.fromSource(source(), WatermarkStrategy.noWatermarks().withWatermarkAlignment("iceberg", Duration.ofMinutes(20L), Duration.ofMillis(10L)), SOURCE_NAME, TypeInformation.of(RowData.class)).collectAsync();
        Throwable th = null;
        try {
            JobClient executeAsync = executionEnvironment.executeAsync("Iceberg Source Throttling Test");
            CommonTestUtils.waitForAllTaskRunning(this.miniClusterResource.getMiniCluster(), executeAsync.getJobID(), false);
            appender.appendToTable(new DataFile[]{appender.writeFile(of), appender.writeFile(newArrayListWithCapacity)});
            Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(findAlignmentDriftMetric(executeAsync.getJobID(), TimeUnit.MINUTES.toMillis(80L)).isPresent());
            });
            Gauge<Long> gauge = findAlignmentDriftMetric(executeAsync.getJobID(), TimeUnit.MINUTES.toMillis(80L)).get();
            appender.appendToTable(new DataFile[]{appender.writeFile(of2), appender.writeFile(of3)});
            Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(((Long) gauge.getValue()).longValue() == TimeUnit.MINUTES.toMillis(65L));
            });
            appender.appendToTable(of4);
            Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(((Long) gauge.getValue()).longValue() < TimeUnit.MINUTES.toMillis(20L));
            });
            if (collectAsync != null) {
                if (0 == 0) {
                    collectAsync.close();
                    return;
                }
                try {
                    collectAsync.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (collectAsync != null) {
                if (0 != 0) {
                    try {
                        collectAsync.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    collectAsync.close();
                }
            }
            throw th3;
        }
    }

    protected IcebergSource<RowData> source() {
        return IcebergSource.builder().tableLoader(this.sourceTableResource.tableLoader()).watermarkColumn("ts").project(TestFixtures.TS_SCHEMA).splitSize(100L).streaming(true).monitorInterval(Duration.ofMillis(10L)).streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
    }

    protected Record generateRecord(int i, String str) {
        GenericRecord create = GenericRecord.create(TestFixtures.TS_SCHEMA);
        create.setField("ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(Time.of(i, TimeUnit.MINUTES).toMilliseconds()), ZoneId.of("Z")));
        create.setField("str", str);
        return create;
    }

    private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long j) {
        return this.reporter.findMetrics(jobID, "IcebergSource.*watermarkAlignmentDrift").values().stream().map(metric -> {
            return (Gauge) metric;
        }).filter(gauge -> {
            return ((Long) gauge.getValue()).longValue() == j;
        }).findFirst();
    }

    private GenericAppenderHelper appender() {
        Configuration configuration = new Configuration();
        configuration.set("write.parquet.page-size-bytes", "64");
        configuration.set("write.parquet.row-group-size-bytes", "64");
        return new GenericAppenderHelper(this.sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER, configuration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RowData row(long j, long j2) {
        GenericRowData genericRowData = new GenericRowData(2);
        genericRowData.setField(0, Long.valueOf(j));
        genericRowData.setField(1, String.valueOf(j2));
        return genericRowData;
    }
}
