package org.apache.iceberg.flink.source;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestIcebergSourceContinuous.class */
public class TestIcebergSourceContinuous {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
    private final AtomicLong randomSeed = new AtomicLong(0);

    @Test
    public void testTableScanThenIncremental() throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List generate = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
        genericAppenderHelper.appendToTable(generate);
        CloseableIterator executeAndCollect = createStream(ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build()).executeAndCollect(getClass().getSimpleName());
        Throwable th = null;
        try {
            try {
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate, this.tableResource.table().schema());
                List generate2 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
                genericAppenderHelper.appendToTable(generate2);
                this.tableResource.table().currentSnapshot().snapshotId();
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate2, this.tableResource.table().schema());
                List generate3 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
                genericAppenderHelper.appendToTable(generate3);
                this.tableResource.table().currentSnapshot().snapshotId();
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate3, this.tableResource.table().schema());
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testEarliestSnapshot() throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        List generate = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
        genericAppenderHelper.appendToTable(generate);
        List generate2 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
        genericAppenderHelper.appendToTable(generate2);
        CloseableIterator executeAndCollect = createStream(ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT).build()).executeAndCollect(getClass().getSimpleName());
        Throwable th = null;
        try {
            List<Row> waitForResult = waitForResult(executeAndCollect, 4);
            ArrayList newArrayList = Lists.newArrayList(generate);
            newArrayList.addAll(generate2);
            TestHelpers.assertRecords(waitForResult, newArrayList, this.tableResource.table().schema());
            List generate3 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
            genericAppenderHelper.appendToTable(generate3);
            TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate3, this.tableResource.table().schema());
            List generate4 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
            genericAppenderHelper.appendToTable(generate4);
            TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate4, this.tableResource.table().schema());
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testLatestSnapshot() throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet()));
        List generate = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
        genericAppenderHelper.appendToTable(generate);
        CloseableIterator executeAndCollect = createStream(ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT).build()).executeAndCollect(getClass().getSimpleName());
        Throwable th = null;
        try {
            try {
                waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient());
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate, this.tableResource.table().schema());
                List generate2 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
                genericAppenderHelper.appendToTable(generate2);
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate2, this.tableResource.table().schema());
                List generate3 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
                genericAppenderHelper.appendToTable(generate3);
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate3, this.tableResource.table().schema());
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSpecificSnapshotId() throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet()));
        this.tableResource.table().currentSnapshot().snapshotId();
        List generate = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
        genericAppenderHelper.appendToTable(generate);
        CloseableIterator executeAndCollect = createStream(ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID).startSnapshotId(Long.valueOf(this.tableResource.table().currentSnapshot().snapshotId())).build()).executeAndCollect(getClass().getSimpleName());
        Throwable th = null;
        try {
            TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate, this.tableResource.table().schema());
            List generate2 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
            genericAppenderHelper.appendToTable(generate2);
            TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate2, this.tableResource.table().schema());
            List generate3 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
            genericAppenderHelper.appendToTable(generate3);
            TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate3, this.tableResource.table().schema());
            if (executeAndCollect != null) {
                if (0 == 0) {
                    executeAndCollect.close();
                    return;
                }
                try {
                    executeAndCollect.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (0 != 0) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSpecificSnapshotTimestamp() throws Exception {
        GenericAppenderHelper genericAppenderHelper = new GenericAppenderHelper(this.tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        genericAppenderHelper.appendToTable(RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet()));
        this.tableResource.table().currentSnapshot().timestampMillis();
        Thread.sleep(2L);
        List generate = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
        genericAppenderHelper.appendToTable(generate);
        CloseableIterator executeAndCollect = createStream(ScanContext.builder().streaming(true).monitorInterval(Duration.ofMillis(10L)).startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP).startSnapshotTimestamp(Long.valueOf(this.tableResource.table().currentSnapshot().timestampMillis())).build()).executeAndCollect(getClass().getSimpleName());
        Throwable th = null;
        try {
            try {
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate, this.tableResource.table().schema());
                List generate2 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
                genericAppenderHelper.appendToTable(generate2);
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate2, this.tableResource.table().schema());
                List generate3 = RandomGenericData.generate(this.tableResource.table().schema(), 2, this.randomSeed.incrementAndGet());
                genericAppenderHelper.appendToTable(generate3);
                TestHelpers.assertRecords(waitForResult(executeAndCollect, 2), generate3, this.tableResource.table().schema());
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        return executionEnvironment.fromSource(IcebergSource.forRowData().tableLoader(this.tableResource.tableLoader()).assignerFactory(new SimpleSplitAssignerFactory()).streaming(scanContext.isStreaming()).streamingStartingStrategy(scanContext.streamingStartingStrategy()).startSnapshotTimestamp(scanContext.startSnapshotTimestamp()).startSnapshotId(scanContext.startSnapshotId()).monitorInterval(Duration.ofMillis(10L)).build(), WatermarkStrategy.noWatermarks(), "icebergSource", TypeInformation.of(RowData.class)).map(new RowDataToRowMapper(FlinkSchemaUtil.convert(this.tableResource.table().schema())));
    }

    public static List<Row> waitForResult(CloseableIterator<Row> closeableIterator, int i) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        while (newArrayListWithCapacity.size() < i && closeableIterator.hasNext()) {
            newArrayListWithCapacity.add(closeableIterator.next());
        }
        return newArrayListWithCapacity;
    }

    public static void waitUntilJobIsRunning(ClusterClient<?> clusterClient) throws Exception {
        while (getRunningJobs(clusterClient).isEmpty()) {
            Thread.sleep(10L);
        }
    }

    public static List<JobID> getRunningJobs(ClusterClient<?> clusterClient) throws Exception {
        return (List) ((Collection) clusterClient.listJobs().get()).stream().filter(jobStatusMessage -> {
            return jobStatusMessage.getJobState() == JobStatus.RUNNING;
        }).map((v0) -> {
            return v0.getJobId();
        }).collect(Collectors.toList());
    }
}
