package org.apache.iceberg.flink.source;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.UnmodifiableIterator;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/TestStreamingMonitorFunction.class */
public class TestStreamingMonitorFunction extends TableTestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get())});
    private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;
    private static final long WAIT_TIME_MILLIS = 10000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/flink/source/TestStreamingMonitorFunction$TestSourceContext.class */
    public class TestSourceContext implements SourceFunction.SourceContext<FlinkInputSplit> {
        private final List<FlinkInputSplit> splits = Lists.newArrayList();
        private final Object checkpointLock = new Object();
        private final CountDownLatch latch;

        TestSourceContext(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void collect(FlinkInputSplit flinkInputSplit) {
            this.splits.add(flinkInputSplit);
            this.latch.countDown();
        }

        public void collectWithTimestamp(FlinkInputSplit flinkInputSplit, long j) {
            collect(flinkInputSplit);
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<Row> toRows() throws IOException {
            FlinkInputFormat buildFormat = FlinkSource.forRowData().tableLoader(TestTableLoader.of(TestStreamingMonitorFunction.this.tableDir.getAbsolutePath())).buildFormat();
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<FlinkInputSplit> it = this.splits.iterator();
            while (it.hasNext()) {
                buildFormat.open(it.next());
                RowData rowData = null;
                while (!buildFormat.reachedEnd()) {
                    try {
                        rowData = buildFormat.nextRecord(rowData);
                        newArrayList.add(Row.of(new Object[]{Integer.valueOf(rowData.getInt(0)), rowData.getString(1).toString()}));
                    } finally {
                        buildFormat.close();
                    }
                }
            }
            return newArrayList;
        }
    }

    @Parameterized.Parameters(name = "FormatVersion={0}")
    public static Iterable<Object[]> parameters() {
        return ImmutableList.of(new Object[]{1}, new Object[]{2});
    }

    public TestStreamingMonitorFunction(int i) {
        super(i);
    }

    @Before
    public void setupTable() throws IOException {
        this.tableDir = this.temp.newFolder();
        this.metadataDir = new File(this.tableDir, "metadata");
        Assert.assertTrue(this.tableDir.delete());
        this.table = create(SCHEMA, PartitionSpec.unpartitioned());
    }

    private void runSourceFunctionInTask(TestSourceContext testSourceContext, StreamingMonitorFunction streamingMonitorFunction) {
        new Thread(() -> {
            try {
                streamingMonitorFunction.run(testSourceContext);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();
    }

    @Test
    public void testConsumeWithoutStartSnapshotId() throws Exception {
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(10);
        StreamingMonitorFunction createFunction = createFunction(ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).build());
        AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness = createHarness(createFunction);
        Throwable th = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                TestSourceContext testSourceContext = new TestSourceContext(new CountDownLatch(1));
                runSourceFunctionInTask(testSourceContext, createFunction);
                awaitExpectedSplits(testSourceContext);
                createFunction.close();
                TestHelpers.assertRecords(testSourceContext.toRows(), Lists.newArrayList(Iterables.concat(generateRecordsAndCommitTxn)), SCHEMA);
                if (createHarness != null) {
                    $closeResource(null, createHarness);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createHarness != null) {
                $closeResource(th, createHarness);
            }
            throw th3;
        }
    }

    @Test
    public void testConsumeFromStartSnapshotId() throws Exception {
        generateRecordsAndCommitTxn(5);
        long snapshotId = this.table.currentSnapshot().snapshotId();
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(5);
        StreamingMonitorFunction createFunction = createFunction(ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).startSnapshotId(Long.valueOf(snapshotId)).build());
        AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness = createHarness(createFunction);
        Throwable th = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                TestSourceContext testSourceContext = new TestSourceContext(new CountDownLatch(1));
                runSourceFunctionInTask(testSourceContext, createFunction);
                awaitExpectedSplits(testSourceContext);
                createFunction.close();
                TestHelpers.assertRecords(testSourceContext.toRows(), Lists.newArrayList(Iterables.concat(generateRecordsAndCommitTxn)), SCHEMA);
                if (createHarness != null) {
                    $closeResource(null, createHarness);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createHarness != null) {
                $closeResource(th, createHarness);
            }
            throw th3;
        }
    }

    @Test
    public void testConsumeFromStartTag() throws Exception {
        generateRecordsAndCommitTxn(5);
        this.table.manageSnapshots().createTag("t1", this.table.currentSnapshot().snapshotId()).commit();
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(5);
        StreamingMonitorFunction createFunction = createFunction(ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).startTag("t1").build());
        AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness = createHarness(createFunction);
        try {
            createHarness.setup();
            createHarness.open();
            TestSourceContext testSourceContext = new TestSourceContext(new CountDownLatch(1));
            runSourceFunctionInTask(testSourceContext, createFunction);
            awaitExpectedSplits(testSourceContext);
            createFunction.close();
            TestHelpers.assertRecords(testSourceContext.toRows(), Lists.newArrayList(Iterables.concat(generateRecordsAndCommitTxn)), SCHEMA);
            if (createHarness != null) {
                $closeResource(null, createHarness);
            }
        } catch (Throwable th) {
            if (createHarness != null) {
                $closeResource(null, createHarness);
            }
            throw th;
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(10);
        ScanContext build = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).build();
        StreamingMonitorFunction createFunction = createFunction(build);
        AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness = createHarness(createFunction);
        Throwable th = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                TestSourceContext testSourceContext = new TestSourceContext(new CountDownLatch(1));
                runSourceFunctionInTask(testSourceContext, createFunction);
                awaitExpectedSplits(testSourceContext);
                OperatorSubtaskState snapshot = createHarness.snapshot(1L, 1L);
                createFunction.close();
                TestHelpers.assertRecords(testSourceContext.toRows(), Lists.newArrayList(Iterables.concat(generateRecordsAndCommitTxn)), SCHEMA);
                if (createHarness != null) {
                    $closeResource(null, createHarness);
                }
                List<List<Record>> generateRecordsAndCommitTxn2 = generateRecordsAndCommitTxn(10);
                StreamingMonitorFunction createFunction2 = createFunction(build);
                createHarness = createHarness(createFunction2);
                Throwable th2 = null;
                try {
                    try {
                        createHarness.setup();
                        createHarness.initializeState(snapshot);
                        createHarness.open();
                        TestSourceContext testSourceContext2 = new TestSourceContext(new CountDownLatch(1));
                        runSourceFunctionInTask(testSourceContext2, createFunction2);
                        awaitExpectedSplits(testSourceContext2);
                        createFunction2.close();
                        TestHelpers.assertRecords(testSourceContext2.toRows(), Lists.newArrayList(Iterables.concat(generateRecordsAndCommitTxn2)), SCHEMA);
                        if (createHarness != null) {
                            $closeResource(null, createHarness);
                        }
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } finally {
        }
    }

    private void awaitExpectedSplits(TestSourceContext testSourceContext) {
        Awaitility.await("expected splits should be produced").atMost(Duration.ofMillis(WAIT_TIME_MILLIS)).untilAsserted(() -> {
            Assertions.assertThat(testSourceContext.latch.getCount()).isEqualTo(0L);
            Assertions.assertThat(testSourceContext.splits).as("Should produce the expected splits", new Object[0]).hasSize(1);
        });
    }

    @Test
    public void testInvalidMaxPlanningSnapshotCount() {
        ScanContext build = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).maxPlanningSnapshotCount(0).build();
        Assertions.assertThatThrownBy(() -> {
            createFunction(build);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("The max-planning-snapshot-count must be greater than zero");
        ScanContext build2 = ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).maxPlanningSnapshotCount(-10).build();
        Assertions.assertThatThrownBy(() -> {
            createFunction(build2);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("The max-planning-snapshot-count must be greater than zero");
    }

    @Test
    public void testConsumeWithMaxPlanningSnapshotCount() throws Exception {
        generateRecordsAndCommitTxn(10);
        long snapshotId = SnapshotUtil.oldestAncestor(this.table).snapshotId();
        Assert.assertEquals("should produce 9 splits", 9L, FlinkSplitPlanner.planInputSplits(this.table, ScanContext.builder().monitorInterval(Duration.ofMillis(100L)).splitSize(1000L).startSnapshotId(Long.valueOf(snapshotId)).maxPlanningSnapshotCount(Integer.MAX_VALUE).build(), ThreadPools.getWorkerPool()).length);
        UnmodifiableIterator it = ImmutableList.of(1, 9, 15).iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            StreamingMonitorFunction createFunction = createFunction(ScanContext.builder().monitorInterval(Duration.ofMillis(500L)).startSnapshotId(Long.valueOf(snapshotId)).splitSize(1000L).maxPlanningSnapshotCount(intValue).build());
            AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness = createHarness(createFunction);
            Throwable th = null;
            try {
                try {
                    createHarness.setup();
                    createHarness.open();
                    createFunction.sourceContext(new TestSourceContext(new CountDownLatch(1)));
                    createFunction.monitorAndForwardSplits();
                    if (intValue < 10) {
                        Assert.assertEquals("Should produce same splits as max-planning-snapshot-count", intValue, r0.splits.size());
                    }
                    if (createHarness != null) {
                        $closeResource(null, createHarness);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (createHarness != null) {
                    $closeResource(th, createHarness);
                }
                throw th2;
            }
        }
    }

    private List<List<Record>> generateRecordsAndCommitTxn(int i) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            List<Record> generate = RandomGenericData.generate(SCHEMA, 100, 0L);
            newArrayList.add(generate);
            writeRecords(generate);
        }
        return newArrayList;
    }

    private void writeRecords(List<Record> list) throws IOException {
        new GenericAppenderHelper(this.table, DEFAULT_FORMAT, this.temp).appendToTable(list);
    }

    private StreamingMonitorFunction createFunction(ScanContext scanContext) {
        return new StreamingMonitorFunction(TestTableLoader.of(this.tableDir.getAbsolutePath()), scanContext);
    }

    private AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness(StreamingMonitorFunction streamingMonitorFunction) throws Exception {
        return new AbstractStreamOperatorTestHarness<>(new StreamSource(streamingMonitorFunction), 1, 1, 0);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
