package org.apache.iceberg.flink.source;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/source/TestStreamingReaderOperator.class */
public class TestStreamingReaderOperator extends TableTestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get())});
    private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;

    @Parameterized.Parameters(name = "FormatVersion={0}")
    public static Iterable<Object[]> parameters() {
        return ImmutableList.of(new Object[]{1}, new Object[]{2});
    }

    public TestStreamingReaderOperator(int i) {
        super(i);
    }

    @Before
    public void setupTable() throws IOException {
        this.tableDir = this.temp.newFolder();
        this.metadataDir = new File(this.tableDir, "metadata");
        Assert.assertTrue(this.tableDir.delete());
        this.table = create(SCHEMA, PartitionSpec.unpartitioned());
    }

    @Test
    public void testProcessAllRecords() throws Exception {
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(10);
        List<FlinkInputSplit> generateSplits = generateSplits();
        Assert.assertEquals("Should have 10 splits", 10L, generateSplits.size());
        OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader = createReader();
        Throwable th = null;
        try {
            try {
                createReader.setup();
                createReader.open();
                SteppingMailboxProcessor createLocalMailbox = createLocalMailbox(createReader);
                ArrayList newArrayList = Lists.newArrayList();
                for (int i = 0; i < generateSplits.size(); i++) {
                    createReader.processElement(generateSplits.get(i), -1L);
                    Assert.assertTrue("Should processed 1 split", createLocalMailbox.runMailboxStep());
                    newArrayList.addAll(generateRecordsAndCommitTxn.get(i));
                    TestHelpers.assertRecords(readOutputValues(createReader), newArrayList, SCHEMA);
                }
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.AutoCloseable, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness, long] */
    @Test
    public void testTriggerCheckpoint() throws Exception {
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(3);
        List<FlinkInputSplit> generateSplits = generateSplits();
        Assert.assertEquals("Should have 3 splits", 3L, generateSplits.size());
        ?? createReader = createReader();
        Throwable th = null;
        try {
            try {
                createReader.setup();
                createReader.open();
                SteppingMailboxProcessor createLocalMailbox = createLocalMailbox(createReader);
                long j = 0 + 1;
                createReader.processElement(generateSplits.get(0), j);
                createReader.processElement(generateSplits.get(1), j + 1);
                createReader.processElement(generateSplits.get(2), createReader + 1);
                createLocalMailbox.getMainMailboxExecutor().execute(() -> {
                    createReader.snapshot(1L, 3L);
                }, "Trigger snapshot");
                Assert.assertTrue("Should have processed the split0", createLocalMailbox.runMailboxStep());
                Assert.assertTrue("Should have processed the snapshot state action", createLocalMailbox.runMailboxStep());
                TestHelpers.assertRecords(readOutputValues(createReader), generateRecordsAndCommitTxn.get(0), SCHEMA);
                Assert.assertTrue("Should have processed the split1", createLocalMailbox.runMailboxStep());
                Assert.assertTrue("Should have processed the split2", createLocalMailbox.runMailboxStep());
                TestHelpers.assertRecords(readOutputValues(createReader), Lists.newArrayList(Iterables.concat(generateRecordsAndCommitTxn)), SCHEMA);
                if (createReader != 0) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != 0) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        List<List<Record>> generateRecordsAndCommitTxn = generateRecordsAndCommitTxn(15);
        List<FlinkInputSplit> generateSplits = generateSplits();
        Assert.assertEquals("Should have 10 splits", 15L, generateSplits.size());
        ArrayList newArrayList = Lists.newArrayList();
        OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader = createReader();
        try {
            createReader.setup();
            createReader.open();
            Iterator<FlinkInputSplit> it = generateSplits.iterator();
            while (it.hasNext()) {
                createReader.processElement(it.next(), -1L);
            }
            SteppingMailboxProcessor createLocalMailbox = createLocalMailbox(createReader);
            for (int i = 0; i < 5; i++) {
                newArrayList.addAll(generateRecordsAndCommitTxn.get(i));
                Assert.assertTrue("Should have processed the split#" + i, createLocalMailbox.runMailboxStep());
                TestHelpers.assertRecords(readOutputValues(createReader), newArrayList, SCHEMA);
            }
            OperatorSubtaskState snapshot = createReader.snapshot(1L, 1L);
            if (createReader != null) {
                $closeResource(null, createReader);
            }
            newArrayList.clear();
            OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader2 = createReader();
            Throwable th = null;
            try {
                try {
                    createReader2.setup();
                    createReader2.initializeState(snapshot);
                    createReader2.open();
                    SteppingMailboxProcessor createLocalMailbox2 = createLocalMailbox(createReader2);
                    for (int i2 = 5; i2 < 10; i2++) {
                        newArrayList.addAll(generateRecordsAndCommitTxn.get(i2));
                        Assert.assertTrue("Should have processed one split#" + i2, createLocalMailbox2.runMailboxStep());
                        TestHelpers.assertRecords(readOutputValues(createReader2), newArrayList, SCHEMA);
                    }
                    for (int i3 = 10; i3 < 15; i3++) {
                        newArrayList.addAll(generateRecordsAndCommitTxn.get(i3));
                        createReader2.processElement(generateSplits.get(i3), 1L);
                        Assert.assertTrue("Should have processed the split#" + i3, createLocalMailbox2.runMailboxStep());
                        TestHelpers.assertRecords(readOutputValues(createReader2), newArrayList, SCHEMA);
                    }
                    if (createReader2 != null) {
                        $closeResource(null, createReader2);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (createReader2 != null) {
                    $closeResource(th, createReader2);
                }
                throw th3;
            }
        } catch (Throwable th4) {
            if (createReader != null) {
                $closeResource(null, createReader);
            }
            throw th4;
        }
    }

    private List<Row> readOutputValues(OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> oneInputStreamOperatorTestHarness) {
        ArrayList newArrayList = Lists.newArrayList();
        for (RowData rowData : oneInputStreamOperatorTestHarness.extractOutputValues()) {
            newArrayList.add(Row.of(new Object[]{Integer.valueOf(rowData.getInt(0)), rowData.getString(1).toString()}));
        }
        return newArrayList;
    }

    private List<List<Record>> generateRecordsAndCommitTxn(int i) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            List<Record> generate = RandomGenericData.generate(SCHEMA, 100, 0L);
            newArrayList.add(generate);
            writeRecords(generate);
        }
        return newArrayList;
    }

    private void writeRecords(List<Record> list) throws IOException {
        new GenericAppenderHelper(this.table, DEFAULT_FORMAT, this.temp).appendToTable(list);
    }

    private List<FlinkInputSplit> generateSplits() {
        ArrayList newArrayList = Lists.newArrayList();
        List currentAncestorIds = SnapshotUtil.currentAncestorIds(this.table);
        int size = currentAncestorIds.size() - 1;
        while (size >= 0) {
            Collections.addAll(newArrayList, FlinkSplitPlanner.planInputSplits(this.table, size == currentAncestorIds.size() - 1 ? ScanContext.builder().useSnapshotId((Long) currentAncestorIds.get(size)).build() : ScanContext.builder().startSnapshotId((Long) currentAncestorIds.get(size + 1)).endSnapshotId((Long) currentAncestorIds.get(size)).build(), ThreadPools.getWorkerPool()));
            size--;
        }
        return newArrayList;
    }

    private OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader() throws Exception {
        OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(StreamingReaderOperator.factory(FlinkSource.forRowData().tableLoader(TestTableLoader.of(this.tableDir.getAbsolutePath())).buildFormat()), 1, 1, 0);
        oneInputStreamOperatorTestHarness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        return oneInputStreamOperatorTestHarness;
    }

    private SteppingMailboxProcessor createLocalMailbox(OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> oneInputStreamOperatorTestHarness) {
        return new SteppingMailboxProcessor((v0) -> {
            v0.suspendDefaultAction();
        }, oneInputStreamOperatorTestHarness.getTaskMailbox(), StreamTaskActionExecutor.IMMEDIATE);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
