package org.apache.paimon.flink.sink;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/StoreMultiCommitterTest.class */
class StoreMultiCommitterTest {
    private String initialCommitUser;
    private Path warehouse;
    private Catalog.Loader catalogLoader;
    private Catalog catalog;
    private Identifier firstTable;
    private Identifier secondTable;
    private Path firstTablePath;
    private Path secondTablePath;

    @TempDir
    public java.nio.file.Path tempDir;

    StoreMultiCommitterTest() {
    }

    @SafeVarargs
    private final void createTestTables(Catalog catalog, Tuple2<Identifier, Schema>... tuple2Arr) throws Exception {
        for (Tuple2<Identifier, Schema> tuple2 : tuple2Arr) {
            catalog.createTable((Identifier) tuple2.f0, (Schema) tuple2.f1, false);
        }
    }

    @BeforeEach
    public void beforeEach() throws Exception {
        this.initialCommitUser = UUID.randomUUID().toString();
        this.warehouse = new Path("traceable://" + this.tempDir.toString());
        this.firstTable = Identifier.create("test_db", "test_table1");
        this.secondTable = Identifier.create("test_db", "test_table2");
        this.catalogLoader = createCatalogLoader();
        this.catalog = this.catalogLoader.load();
        this.catalog.createDatabase("test_db", true);
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"a", "b"});
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(5)}, new String[]{"a", "b", "c"});
        Options options = new Options();
        options.set(CoreOptions.TAG_AUTOMATIC_CREATION, CoreOptions.TagCreationMode.PROCESS_TIME);
        options.setString("bucket", "-1");
        createTestTables(this.catalog, Tuple2.of(this.firstTable, new Schema(of.getFields(), Collections.emptyList(), Collections.emptyList(), options.toMap(), "")), Tuple2.of(this.secondTable, new Schema(of2.getFields(), Collections.emptyList(), Collections.emptyList(), Collections.singletonMap("bucket", "1"), "")));
        this.firstTablePath = this.catalog.getTable(this.firstTable).location();
        this.secondTablePath = this.catalog.getTable(this.secondTable).location();
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    /* JADX WARN: Type inference failed for: r6v0, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v1, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    @Test
    public void testFailIntentionallyAfterRestore() throws Exception {
        FileStoreTable fileStoreTable = (FileStoreTable) this.catalog.getTable(this.firstTable);
        ?? createRecoverableTestHarness = createRecoverableTestHarness();
        createRecoverableTestHarness.open();
        StreamTableWrite newWrite = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        newWrite.write(GenericRow.of(new Object[]{2, 20L}));
        long j = 1;
        for (CommitMessage commitMessage : newWrite.prepareCommit(false, 8L)) {
            Identifier identifier = this.firstTable;
            ?? r6 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable = getMultiTableCommittable(identifier, new Committable(8L, (Committable.Kind) r6, commitMessage));
            long j2 = j;
            j = r6 + 1;
            createRecoverableTestHarness.processElement(multiTableCommittable, j2);
        }
        long j3 = j + 1;
        OperatorSubtaskState snapshot = createRecoverableTestHarness.snapshot(0L, (long) createRecoverableTestHarness);
        Assertions.assertThat(fileStoreTable.snapshotManager().latestSnapshotId()).isNull();
        createRecoverableTestHarness.close();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness2 = createRecoverableTestHarness();
        try {
            createRecoverableTestHarness2.initializeState(snapshot);
            createRecoverableTestHarness2.open();
            Assertions.fail("Expecting intentional exception");
        } catch (Exception e) {
            Assertions.assertThat(e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        assertResultsForFirstTable(fileStoreTable, "1, 10", "2, 20");
        createRecoverableTestHarness2.close();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness3 = createRecoverableTestHarness();
        createRecoverableTestHarness3.initializeState(snapshot);
        createRecoverableTestHarness3.open();
        assertResultsForFirstTable(fileStoreTable, "1, 10", "2, 20");
        FileStoreTable fileStoreTable2 = (FileStoreTable) this.catalog.getTable(this.secondTable);
        StreamTableWrite newWrite2 = fileStoreTable2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        newWrite2.write(GenericRow.of(new Object[]{3, Double.valueOf(30.0d), BinaryString.fromString("s3")}));
        newWrite2.write(GenericRow.of(new Object[]{4, Double.valueOf(40.0d), BinaryString.fromString("s4")}));
        for (CommitMessage commitMessage2 : newWrite2.prepareCommit(false, 9L)) {
            Identifier identifier2 = this.secondTable;
            ?? r62 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable2 = getMultiTableCommittable(identifier2, new Committable(9L, (Committable.Kind) r62, commitMessage2));
            long j4 = j3;
            j3 = r62 + 1;
            createRecoverableTestHarness3.processElement(multiTableCommittable2, j4);
        }
        OperatorSubtaskState snapshot2 = createRecoverableTestHarness3.snapshot(1L, j3);
        Assertions.assertThat(fileStoreTable2.snapshotManager().latestSnapshotId()).isNull();
        createRecoverableTestHarness3.close();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness4 = createRecoverableTestHarness();
        try {
            createRecoverableTestHarness4.initializeState(snapshot2);
            createRecoverableTestHarness4.open();
            Assertions.fail("Expecting intentional exception");
        } catch (Exception e2) {
            Assertions.assertThat(e2).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        assertResultsForSecondTable(fileStoreTable2, "3, 30.0, s3", "4, 40.0, s4");
        createRecoverableTestHarness4.close();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness5 = createRecoverableTestHarness();
        createRecoverableTestHarness5.initializeState(snapshot2);
        createRecoverableTestHarness5.open();
        assertResultsForSecondTable(fileStoreTable2, "3, 30.0, s3", "4, 40.0, s4");
        createRecoverableTestHarness5.close();
    }

    @Test
    public void testCheckpointAbort() throws Exception {
        FileStoreTable table = this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = this.catalog.getTable(this.secondTable);
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness = createRecoverableTestHarness();
        createRecoverableTestHarness.open();
        StreamTableWrite newWrite = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite newWrite2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long j = 0;
        for (int i = 0; i < 10; i++) {
            j++;
            newWrite.write(GenericRow.of(new Object[]{1, 10L}));
            newWrite.write(GenericRow.of(new Object[]{2, 20L}));
            Iterator it = newWrite.prepareCommit(false, j).iterator();
            while (it.hasNext()) {
                createRecoverableTestHarness.processElement(getMultiTableCommittable(this.firstTable, new Committable(j, Committable.Kind.FILE, (CommitMessage) it.next())), 1L);
            }
        }
        createRecoverableTestHarness.snapshot(j, 1L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(j);
        SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), this.firstTablePath);
        SnapshotManager snapshotManager2 = new SnapshotManager(LocalFileIO.create(), this.secondTablePath);
        Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(j);
        Assertions.assertThat(snapshotManager2.latestSnapshotId()).isNull();
        for (int i2 = 0; i2 < 10; i2++) {
            j++;
            newWrite.write(GenericRow.of(new Object[]{3, 30L}));
            newWrite.write(GenericRow.of(new Object[]{3, 40L}));
            newWrite2.write(GenericRow.of(new Object[]{3, Double.valueOf(30.0d), BinaryString.fromString("s3")}));
            newWrite2.write(GenericRow.of(new Object[]{3, Double.valueOf(40.0d), BinaryString.fromString("s4")}));
            Iterator it2 = newWrite.prepareCommit(false, j).iterator();
            while (it2.hasNext()) {
                createRecoverableTestHarness.processElement(getMultiTableCommittable(this.firstTable, new Committable(j, Committable.Kind.FILE, (CommitMessage) it2.next())), 1L);
            }
            Iterator it3 = newWrite2.prepareCommit(false, j).iterator();
            while (it3.hasNext()) {
                createRecoverableTestHarness.processElement(getMultiTableCommittable(this.secondTable, new Committable(j, Committable.Kind.FILE, (CommitMessage) it3.next())), 1L);
            }
        }
        createRecoverableTestHarness.snapshot(j, 2L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(j);
        Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(20L);
        Assertions.assertThat(snapshotManager2.latestSnapshotId()).isEqualTo(10L);
        createRecoverableTestHarness.close();
    }

    /* JADX WARN: Type inference failed for: r0v17, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    /* JADX WARN: Type inference failed for: r6v0, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v1, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v2, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v3, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v4, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    @Test
    public void testSnapshotLostWhenFailed() throws Exception {
        FileStoreTable fileStoreTable = (FileStoreTable) this.catalog.getTable(this.firstTable);
        FileStoreTable fileStoreTable2 = (FileStoreTable) this.catalog.getTable(this.secondTable);
        StreamTableWrite newWrite = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite newWrite2 = fileStoreTable2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        ?? createLossyTestHarness = createLossyTestHarness();
        createLossyTestHarness.open();
        long j = 1;
        StreamWriteBuilder withCommitUser = fileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser);
        StreamWriteBuilder withCommitUser2 = fileStoreTable2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser);
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        newWrite.write(GenericRow.of(new Object[]{2, 20L}));
        for (CommitMessage commitMessage : newWrite.prepareCommit(false, 1L)) {
            Identifier identifier = this.firstTable;
            ?? r6 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable = getMultiTableCommittable(identifier, new Committable(1L, (Committable.Kind) r6, commitMessage));
            long j2 = j;
            j = r6 + 1;
            createLossyTestHarness.processElement(multiTableCommittable, j2);
        }
        long j3 = j + 1;
        createLossyTestHarness.snapshot(1L, (long) createLossyTestHarness);
        createLossyTestHarness.notifyOfCompletedCheckpoint(1L);
        newWrite.write(GenericRow.of(new Object[]{3, 30L}));
        newWrite.write(GenericRow.of(new Object[]{4, 40L}));
        newWrite2.write(GenericRow.of(new Object[]{3, Double.valueOf(30.0d), BinaryString.fromString("s3")}));
        newWrite2.write(GenericRow.of(new Object[]{3, Double.valueOf(40.0d), BinaryString.fromString("s4")}));
        for (CommitMessage commitMessage2 : newWrite.prepareCommit(false, 2L)) {
            Identifier identifier2 = this.firstTable;
            ?? r62 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable2 = getMultiTableCommittable(identifier2, new Committable(2L, (Committable.Kind) r62, commitMessage2));
            long j4 = j3;
            j3 = r62 + 1;
            createLossyTestHarness.processElement(multiTableCommittable2, j4);
        }
        for (CommitMessage commitMessage3 : newWrite2.prepareCommit(false, 2L)) {
            Identifier identifier3 = this.secondTable;
            ?? r63 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable3 = getMultiTableCommittable(identifier3, new Committable(2L, (Committable.Kind) r63, commitMessage3));
            long j5 = j3;
            j3 = r63 + 1;
            createLossyTestHarness.processElement(multiTableCommittable3, j5);
        }
        long j6 = j3 + 1;
        OperatorSubtaskState snapshot = createLossyTestHarness.snapshot(2L, (long) createLossyTestHarness);
        newWrite.close();
        newWrite2.close();
        createLossyTestHarness.close();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createLossyTestHarness2 = createLossyTestHarness();
        createLossyTestHarness2.initializeState(snapshot);
        createLossyTestHarness2.open();
        StreamTableWrite newWrite3 = withCommitUser.newWrite();
        newWrite3.write(GenericRow.of(new Object[]{5, 50L}));
        newWrite3.write(GenericRow.of(new Object[]{6, 60L}));
        StreamTableWrite newWrite4 = withCommitUser2.newWrite();
        newWrite4.write(GenericRow.of(new Object[]{5, Double.valueOf(50.0d), BinaryString.fromString("s5")}));
        newWrite4.write(GenericRow.of(new Object[]{6, Double.valueOf(60.0d), BinaryString.fromString("s6")}));
        for (CommitMessage commitMessage4 : newWrite3.prepareCommit(false, 3L)) {
            Identifier identifier4 = this.firstTable;
            ?? r64 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable4 = getMultiTableCommittable(identifier4, new Committable(3L, (Committable.Kind) r64, commitMessage4));
            long j7 = j6;
            j6 = r64 + 1;
            createLossyTestHarness2.processElement(multiTableCommittable4, j7);
        }
        for (CommitMessage commitMessage5 : newWrite4.prepareCommit(false, 2L)) {
            Identifier identifier5 = this.secondTable;
            ?? r65 = Committable.Kind.FILE;
            MultiTableCommittable multiTableCommittable5 = getMultiTableCommittable(identifier5, new Committable(2L, (Committable.Kind) r65, commitMessage5));
            long j8 = j6;
            j6 = r65 + 1;
            createLossyTestHarness2.processElement(multiTableCommittable5, j8);
        }
        createLossyTestHarness2.snapshot(3L, j6);
        createLossyTestHarness2.notifyOfCompletedCheckpoint(3L);
        newWrite3.close();
        newWrite4.close();
        createLossyTestHarness2.close();
        assertResultsForFirstTable(fileStoreTable, "1, 10", "2, 20", "5, 50", "6, 60");
        assertResultsForSecondTable(fileStoreTable2, "5, 50.0, s5", "6, 60.0, s6");
    }

    /* JADX WARN: Type inference failed for: r6v0, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v1, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    @Test
    public void testWatermarkCommit() throws Exception {
        FileStoreTable table = this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = this.catalog.getTable(this.secondTable);
        StreamTableWrite newWrite = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite newWrite2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness = createRecoverableTestHarness();
        createRecoverableTestHarness.open();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        Identifier identifier = this.firstTable;
        ?? r6 = Committable.Kind.FILE;
        long j = r6 + 1;
        createRecoverableTestHarness.processElement(getMultiTableCommittable(identifier, new Committable(1L, (Committable.Kind) r6, newWrite.prepareCommit(true, 1L).get(0))), 0L);
        createRecoverableTestHarness.processWatermark(new Watermark(1024L));
        createRecoverableTestHarness.snapshot(1L, j);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table.snapshotManager().latestSnapshot())).watermark()).isEqualTo(1024L);
        Assertions.assertThat(table2.snapshotManager().latestSnapshot()).isNull();
        newWrite.write(GenericRow.of(new Object[]{1, 20L}));
        newWrite2.write(GenericRow.of(new Object[]{1, Double.valueOf(20.0d), BinaryString.fromString("s2")}));
        Identifier identifier2 = this.firstTable;
        ?? r62 = Committable.Kind.FILE;
        createRecoverableTestHarness.processElement(getMultiTableCommittable(identifier2, new Committable(2L, (Committable.Kind) r62, newWrite.prepareCommit(true, 2L).get(0))), j + 1);
        createRecoverableTestHarness.processWatermark(new Watermark(2048L));
        createRecoverableTestHarness.snapshot(2L, r62 + 1);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(2L);
        createRecoverableTestHarness.close();
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table.snapshotManager().latestSnapshot())).watermark()).isEqualTo(2048L);
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table.snapshotManager().latestSnapshot())).watermark()).isEqualTo(2048L);
    }

    /* JADX WARN: Type inference failed for: r6v0, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v1, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    @Test
    public void testEmptyCommit() throws Exception {
        FileStoreTable table = this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = this.catalog.getTable(this.secondTable);
        StreamTableWrite newWrite = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite newWrite2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness = createRecoverableTestHarness();
        createRecoverableTestHarness.open();
        newWrite.write(GenericRow.of(new Object[]{1, 20L}));
        newWrite2.write(GenericRow.of(new Object[]{1, Double.valueOf(20.0d), BinaryString.fromString("s2")}));
        Identifier identifier = this.firstTable;
        ?? r6 = Committable.Kind.FILE;
        long j = r6 + 1;
        createRecoverableTestHarness.processElement(getMultiTableCommittable(identifier, new Committable(1L, (Committable.Kind) r6, newWrite.prepareCommit(true, 1L).get(0))), 0L);
        Identifier identifier2 = this.secondTable;
        ?? r62 = Committable.Kind.FILE;
        long j2 = r62 + 1;
        createRecoverableTestHarness.processElement(getMultiTableCommittable(identifier2, new Committable(1L, (Committable.Kind) r62, newWrite2.prepareCommit(true, 1L).get(0))), j);
        createRecoverableTestHarness.processWatermark(new Watermark(2048L));
        createRecoverableTestHarness.snapshot(1L, j2);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table.snapshotManager().latestSnapshot())).watermark()).isEqualTo(2048L);
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table2.snapshotManager().latestSnapshot())).watermark()).isEqualTo(2048L);
        long j3 = 1 + 1;
        createRecoverableTestHarness.snapshot(j3, j2 + 1);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(j3);
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table.snapshotManager().latestSnapshot())).id()).isEqualTo(1L);
        Assertions.assertThat(((Snapshot) Objects.requireNonNull(table2.snapshotManager().latestSnapshot())).id()).isEqualTo(1L);
        createRecoverableTestHarness.close();
    }

    /* JADX WARN: Type inference failed for: r6v0, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    /* JADX WARN: Type inference failed for: r6v1, types: [long, org.apache.paimon.flink.sink.Committable$Kind] */
    @Test
    public void testCommitMetrics() throws Exception {
        FileStoreTable table = this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = this.catalog.getTable(this.secondTable);
        StreamTableWrite newWrite = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite newWrite2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness = createRecoverableTestHarness();
        createRecoverableTestHarness.open();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        newWrite2.write(GenericRow.of(new Object[]{1, Double.valueOf(1.1d), BinaryString.fromString("AAA")}));
        newWrite2.compact(BinaryRow.EMPTY_ROW, 0, false);
        newWrite2.write(GenericRow.of(new Object[]{1, Double.valueOf(1.2d), BinaryString.fromString("aaa")}));
        newWrite2.compact(BinaryRow.EMPTY_ROW, 0, false);
        newWrite2.write(GenericRow.of(new Object[]{2, Double.valueOf(2.1d), BinaryString.fromString("BBB")}));
        newWrite2.compact(BinaryRow.EMPTY_ROW, 0, true);
        Identifier identifier = this.firstTable;
        ?? r6 = Committable.Kind.FILE;
        long j = r6 + 1;
        createRecoverableTestHarness.processElement(getMultiTableCommittable(identifier, new Committable(1L, (Committable.Kind) r6, newWrite.prepareCommit(true, 1L).get(0))), 0L);
        Identifier identifier2 = this.secondTable;
        ?? r62 = Committable.Kind.FILE;
        createRecoverableTestHarness.processElement(getMultiTableCommittable(identifier2, new Committable(1L, (Committable.Kind) r62, newWrite2.prepareCommit(true, 1L).get(0))), j);
        createRecoverableTestHarness.snapshot(1L, r62 + 1);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        OperatorMetricGroup metricGroup = createRecoverableTestHarness.getOperator().getRuntimeContext().getMetricGroup();
        MetricGroup addGroup = metricGroup.addGroup("paimon").addGroup("table", table.name()).addGroup("commit");
        MetricGroup addGroup2 = metricGroup.addGroup("paimon").addGroup("table", table2.name()).addGroup("commit");
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesAdded").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesDeleted").getValue()).isEqualTo(0L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesAppended").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesCommitCompacted").getValue()).isEqualTo(0L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup2, "lastTableFilesAdded").getValue()).isEqualTo(4L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup2, "lastTableFilesDeleted").getValue()).isEqualTo(3L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup2, "lastTableFilesAppended").getValue()).isEqualTo(3L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup2, "lastTableFilesCommitCompacted").getValue()).isEqualTo(4L);
        createRecoverableTestHarness.close();
        newWrite.close();
        newWrite2.close();
    }

    private OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness() throws Exception {
        return createTestHarness(new CommitterOperator<>(true, false, this.initialCommitUser, (str, operatorMetricGroup) -> {
            return new StoreMultiCommitter(this.catalogLoader, this.initialCommitUser, operatorMetricGroup);
        }, new RestoreAndFailCommittableStateManager(() -> {
            return new VersionedSerializerWrapper(new WrappedManifestCommittableSerializer());
        })));
    }

    private OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createLossyTestHarness() throws Exception {
        return createTestHarness(new CommitterOperator<>(true, false, this.initialCommitUser, (str, operatorMetricGroup) -> {
            return new StoreMultiCommitter(this.catalogLoader, this.initialCommitUser, operatorMetricGroup);
        }, new CommittableStateManager<WrappedManifestCommittable>() { // from class: org.apache.paimon.flink.sink.StoreMultiCommitterTest.1
            public void initializeState(StateInitializationContext stateInitializationContext, Committer<?, WrappedManifestCommittable> committer) {
            }

            public void snapshotState(StateSnapshotContext stateSnapshotContext, List<WrappedManifestCommittable> list) {
            }
        }));
    }

    private OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createTestHarness(CommitterOperator<MultiTableCommittable, WrappedManifestCommittable> committerOperator) throws Exception {
        TypeSerializer createSerializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(committerOperator, createSerializer);
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    private Catalog.Loader createCatalogLoader() {
        Options createCatalogOptions = createCatalogOptions(this.warehouse);
        return () -> {
            return CatalogFactory.createCatalog(CatalogContext.create(createCatalogOptions));
        };
    }

    private Options createCatalogOptions(Path path) {
        Options options = new Options();
        options.set(CatalogOptions.WAREHOUSE, path.toString());
        options.set(CatalogOptions.URI, "");
        return options;
    }

    protected void assertResultsForFirstTable(FileStoreTable fileStoreTable, String... strArr) {
        TableRead newRead = fileStoreTable.newReadBuilder().newRead();
        ArrayList arrayList = new ArrayList();
        fileStoreTable.newReadBuilder().newScan().plan().splits().forEach(split -> {
            try {
                RecordReaderIterator recordReaderIterator = new RecordReaderIterator(newRead.createReader(split));
                while (recordReaderIterator.hasNext()) {
                    InternalRow internalRow = (InternalRow) recordReaderIterator.next();
                    arrayList.add(internalRow.getInt(0) + ", " + internalRow.getLong(1));
                }
                recordReaderIterator.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Collections.sort(arrayList);
        Assertions.assertThat(arrayList).isEqualTo(Arrays.asList(strArr));
    }

    private void assertResultsForSecondTable(FileStoreTable fileStoreTable, String... strArr) {
        TableRead newRead = fileStoreTable.newReadBuilder().newRead();
        ArrayList arrayList = new ArrayList();
        fileStoreTable.newReadBuilder().newScan().plan().splits().forEach(split -> {
            try {
                RecordReaderIterator recordReaderIterator = new RecordReaderIterator(newRead.createReader(split));
                while (recordReaderIterator.hasNext()) {
                    InternalRow internalRow = (InternalRow) recordReaderIterator.next();
                    arrayList.add(internalRow.getInt(0) + ", " + internalRow.getDouble(1) + ", " + internalRow.getString(2));
                }
                recordReaderIterator.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Collections.sort(arrayList);
        Assertions.assertThat(arrayList).isEqualTo(Arrays.asList(strArr));
    }

    private MultiTableCommittable getMultiTableCommittable(Identifier identifier, Committable committable) {
        return MultiTableCommittable.fromCommittable(identifier, committable);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1260363888:
                if (implMethodName.equals("lambda$createRecoverableTestHarness$7c1b1e07$1")) {
                    z = false;
                    break;
                }
                break;
            case -280860613:
                if (implMethodName.equals("lambda$createCatalogLoader$71cd4600$1")) {
                    z = true;
                    break;
                }
                break;
            case 377861064:
                if (implMethodName.equals("lambda$createLossyTestHarness$7c1b1e07$1")) {
                    z = 2;
                    break;
                }
                break;
            case 903659858:
                if (implMethodName.equals("lambda$createRecoverableTestHarness$dd4dfbf4$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/Committer$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/StoreMultiCommitterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;")) {
                    StoreMultiCommitterTest storeMultiCommitterTest = (StoreMultiCommitterTest) serializedLambda.getCapturedArg(0);
                    return (str, operatorMetricGroup) -> {
                        return new StoreMultiCommitter(this.catalogLoader, this.initialCommitUser, operatorMetricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/catalog/Catalog$Loader") && serializedLambda.getFunctionalInterfaceMethodName().equals("load") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/catalog/Catalog;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/StoreMultiCommitterTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/options/Options;)Lorg/apache/paimon/catalog/Catalog;")) {
                    Options options = (Options) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return CatalogFactory.createCatalog(CatalogContext.create(options));
                    };
                }
                break;
            case ReadWriteTableTestUtil.DEFAULT_PARALLELISM /* 2 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/Committer$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/StoreMultiCommitterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;")) {
                    StoreMultiCommitterTest storeMultiCommitterTest2 = (StoreMultiCommitterTest) serializedLambda.getCapturedArg(0);
                    return (str2, operatorMetricGroup2) -> {
                        return new StoreMultiCommitter(this.catalogLoader, this.initialCommitUser, operatorMetricGroup2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/StoreMultiCommitterTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    return () -> {
                        return new VersionedSerializerWrapper(new WrappedManifestCommittableSerializer());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
