package org.apache.paimon.flink.sink;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/sink/CommitterOperatorTest.class */
public class CommitterOperatorTest extends CommitterOperatorTestBase {
    protected String initialCommitUser;

    @Override // org.apache.paimon.flink.sink.CommitterOperatorTestBase
    @BeforeEach
    public void before() {
        super.before();
        this.initialCommitUser = UUID.randomUUID().toString();
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    @Test
    public void testFailIntentionallyAfterRestore() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        ?? createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        StreamTableWrite newWrite = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        newWrite.write(GenericRow.of(new Object[]{2, 20L}));
        long j = 1;
        for (CommitMessage commitMessage : newWrite.prepareCommit(false, 8L)) {
            Committable.Kind kind = Committable.Kind.FILE;
            long j2 = j;
            j = j2 + 1;
            kind.processElement(new Committable(8L, kind, commitMessage), j2);
        }
        long j3 = j + 1;
        OperatorSubtaskState snapshot = createRecoverableTestHarness.snapshot(0L, (long) createRecoverableTestHarness);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshotId()).isNull();
        createRecoverableTestHarness.close();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness2 = createRecoverableTestHarness(createFileStoreTable);
        try {
            createRecoverableTestHarness2.initializeState(snapshot);
            createRecoverableTestHarness2.open();
            Assertions.fail("Expecting intentional exception");
        } catch (Exception e) {
            Assertions.assertThat(e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        assertResults(createFileStoreTable, "1, 10", "2, 20");
        createRecoverableTestHarness2.close();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness3 = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness3.initializeState(snapshot);
        createRecoverableTestHarness3.open();
        assertResults(createFileStoreTable, "1, 10", "2, 20");
        createRecoverableTestHarness3.close();
    }

    @Test
    public void testCheckpointAbort() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        long j = 0;
        for (int i = 0; i < 10; i++) {
            j++;
            StreamTableWrite newWrite = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
            newWrite.write(GenericRow.of(new Object[]{1, 10L}));
            newWrite.write(GenericRow.of(new Object[]{2, 20L}));
            Iterator it = newWrite.prepareCommit(false, j).iterator();
            while (it.hasNext()) {
                createRecoverableTestHarness.processElement(new Committable(j, Committable.Kind.FILE, (CommitMessage) it.next()), 1L);
            }
        }
        createRecoverableTestHarness.snapshot(j, 1L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(j);
        Assertions.assertThat(new SnapshotManager(LocalFileIO.create(), this.tablePath).latestSnapshotId()).isEqualTo(j);
        createRecoverableTestHarness.close();
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    /* JADX WARN: Type inference failed for: r0v33, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    @Test
    public void testSnapshotLostWhenFailed() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        ?? createLossyTestHarness = createLossyTestHarness(createFileStoreTable);
        createLossyTestHarness.open();
        long j = 1;
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser);
        StreamTableWrite newWrite = withCommitUser.newWrite();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        newWrite.write(GenericRow.of(new Object[]{2, 20L}));
        for (CommitMessage commitMessage : newWrite.prepareCommit(false, 1L)) {
            Committable.Kind kind = Committable.Kind.FILE;
            long j2 = j;
            j = j2 + 1;
            kind.processElement(new Committable(1L, kind, commitMessage), j2);
        }
        long j3 = j + 1;
        createLossyTestHarness.snapshot(1L, (long) createLossyTestHarness);
        createLossyTestHarness.notifyOfCompletedCheckpoint(1L);
        newWrite.write(GenericRow.of(new Object[]{3, 30L}));
        newWrite.write(GenericRow.of(new Object[]{4, 40L}));
        for (CommitMessage commitMessage2 : newWrite.prepareCommit(false, 2L)) {
            Committable.Kind kind2 = Committable.Kind.FILE;
            long j4 = j3;
            j3 = j4 + 1;
            kind2.processElement(new Committable(2L, kind2, commitMessage2), j4);
        }
        long j5 = j3 + 1;
        OperatorSubtaskState snapshot = createLossyTestHarness.snapshot(2L, (long) createLossyTestHarness);
        newWrite.close();
        createLossyTestHarness.close();
        ?? createLossyTestHarness2 = createLossyTestHarness(createFileStoreTable);
        createLossyTestHarness2.initializeState(snapshot);
        createLossyTestHarness2.open();
        StreamTableWrite newWrite2 = withCommitUser.newWrite();
        newWrite2.write(GenericRow.of(new Object[]{5, 50L}));
        newWrite2.write(GenericRow.of(new Object[]{6, 60L}));
        for (CommitMessage commitMessage3 : newWrite2.prepareCommit(false, 3L)) {
            Committable.Kind kind3 = Committable.Kind.FILE;
            long j6 = j5;
            j5 = j6 + 1;
            kind3.processElement(new Committable(3L, kind3, commitMessage3), j6);
        }
        long j7 = j5 + 1;
        createLossyTestHarness2.snapshot(3L, (long) createLossyTestHarness2);
        createLossyTestHarness2.notifyOfCompletedCheckpoint(3L);
        newWrite2.close();
        createLossyTestHarness2.close();
        assertResults(createFileStoreTable, "1, 10", "2, 20", "5, 50", "6, 60");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.paimon.table.FileStoreTable] */
    @Test
    public void testRestoreCommitUser() throws Exception {
        ?? createFileStoreTable = createFileStoreTable();
        String uuid = UUID.randomUUID().toString();
        ArrayList arrayList = new ArrayList();
        long j = 1;
        for (int i = 0; i < 5; i++) {
            OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness = createLossyTestHarness(createFileStoreTable, uuid);
            createLossyTestHarness.open();
            long j2 = j + 1;
            j = createFileStoreTable;
            arrayList.add(writeAndSnapshot(createFileStoreTable, uuid, 1L, j2, createLossyTestHarness));
            createLossyTestHarness.close();
        }
        OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState((OperatorSubtaskState[]) arrayList.toArray(new OperatorSubtaskState[0]));
        OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness2 = createLossyTestHarness(createFileStoreTable);
        createLossyTestHarness2.initializeState(repackageState);
        OperatorSubtaskState writeAndSnapshot = writeAndSnapshot(createFileStoreTable, this.initialCommitUser, 1L, j + 1, createLossyTestHarness2);
        createLossyTestHarness2.close();
        ArrayList arrayList2 = new ArrayList();
        OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness = createTestHarness(createCommitterOperator(createFileStoreTable, this.initialCommitUser, new NoopCommittableStateManager(), stateInitializationContext -> {
            Iterable iterable = (Iterable) stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("commit_user_state", String.class)).get();
            arrayList2.getClass();
            iterable.forEach((v1) -> {
                r1.add(v1);
            });
        }));
        createTestHarness.initializeState(writeAndSnapshot);
        createTestHarness.close();
        Assertions.assertThat(arrayList2.size()).isEqualTo(1);
        Assertions.assertThat(arrayList2).hasSameElementsAs(Lists.newArrayList(new String[]{uuid}));
    }

    @Test
    public void testCommitInputEnd() throws Exception {
        CommitterOperator createCommitterOperator = createCommitterOperator(createFileStoreTable(), UUID.randomUUID().toString(), new NoopCommittableStateManager());
        OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness = createTestHarness(createCommitterOperator);
        createTestHarness.open();
        Assertions.assertThatCode(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            createTestHarness.processElement(new Committable(Long.MAX_VALUE, Committable.Kind.FILE, new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), 0L);
            long j = createTestHarness + 1;
            createTestHarness.snapshot(0L, currentTimeMillis);
            createTestHarness.processElement(new Committable(Long.MAX_VALUE, Committable.Kind.FILE, new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), j);
            createTestHarness.processElement(new Committable(Long.MAX_VALUE, Committable.Kind.FILE, new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), j);
            createTestHarness.snapshot(j, createTestHarness + 1);
            createTestHarness.snapshot(createTestHarness + 1, createTestHarness + 1);
        }).doesNotThrowAnyException();
        if (createCommitterOperator instanceof CommitterOperator) {
            Assertions.assertThat(((ManifestCommittable) createCommitterOperator.committablesPerCheckpoint.get(Long.MAX_VALUE)).fileCommittables().size()).isEqualTo(3);
        }
        Assertions.assertThatCode(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            createTestHarness.processElement(new Committable(0L, Committable.Kind.FILE, new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), 0L);
            long j = createTestHarness + 1;
            createTestHarness.snapshot(0L, currentTimeMillis);
            createTestHarness.processElement(new Committable(0L, Committable.Kind.FILE, new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), j);
            createTestHarness.processElement(new Committable(Long.MAX_VALUE, Committable.Kind.FILE, new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), j);
            createTestHarness.snapshot(j, createTestHarness + 1);
            createTestHarness.snapshot(createTestHarness + 1, createTestHarness + 1);
        }).hasRootCauseInstanceOf(RuntimeException.class).cause().hasMessageContaining("Repeatedly commit the same checkpoint files.");
    }

    /* JADX WARN: Type inference failed for: r4v4, types: [org.apache.paimon.flink.sink.Committable$Kind] */
    private static OperatorSubtaskState writeAndSnapshot(FileStoreTable fileStoreTable, String str, long j, long j2, OneInputStreamOperatorTestHarness<Committable, Committable> oneInputStreamOperatorTestHarness) throws Exception {
        StreamTableWrite newWrite = fileStoreTable.newStreamWriteBuilder().withCommitUser(str).newWrite();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        for (CommitMessage commitMessage : newWrite.prepareCommit(false, 1L)) {
            ?? r4 = Committable.Kind.FILE;
            long j3 = j + 1;
            j = r4;
            oneInputStreamOperatorTestHarness.processElement(new Committable(j2, (Committable.Kind) r4, commitMessage), j3);
        }
        return oneInputStreamOperatorTestHarness.snapshot(j2, j + 1);
    }

    @Test
    public void testWatermarkCommit() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        StreamTableWrite newWrite = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        Committable.Kind kind = Committable.Kind.FILE;
        long j = 0 + 1;
        kind.processElement(new Committable(1L, kind, newWrite.prepareCommit(true, 1L).get(0)), 0L);
        createRecoverableTestHarness.processWatermark(new Watermark(1024L));
        long j2 = j + 1;
        createRecoverableTestHarness.snapshot(1L, j);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
        newWrite.write(GenericRow.of(new Object[]{1, 20L}));
        Committable.Kind kind2 = Committable.Kind.FILE;
        long j3 = j2 + 1;
        kind2.processElement(new Committable(2L, kind2, newWrite.prepareCommit(true, 2L).get(0)), j2);
        createRecoverableTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        long j4 = j3 + 1;
        createRecoverableTestHarness.snapshot(2L, j3);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(2L);
        createRecoverableTestHarness.close();
        newWrite.close();
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
    }

    @Test
    public void testEmptyCommit() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        createRecoverableTestHarness.snapshot(1L, 1L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshot()).isNull();
    }

    @Test
    public void testForceCreateSnapshotCommit() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true");
        });
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        createRecoverableTestHarness.snapshot(1L, 1L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshot()).isNotNull();
    }

    @Test
    public void testEmptyCommitWithProcessTimeTag() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.TAG_AUTOMATIC_CREATION, CoreOptions.TagCreationMode.PROCESS_TIME);
            options.set(CoreOptions.TAG_CREATION_PERIOD, CoreOptions.TagCreationPeriod.DAILY);
        });
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        createRecoverableTestHarness.snapshot(1L, 1L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Snapshot latestSnapshot = createFileStoreTable.snapshotManager().latestSnapshot();
        Assertions.assertThat(latestSnapshot).isNotNull();
        Assertions.assertThat(latestSnapshot.id()).isEqualTo(1L);
        Assertions.assertThat(createFileStoreTable.tagManager().tagCount()).isEqualTo(1L);
        createRecoverableTestHarness.snapshot(2L, 2L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(2L);
        Snapshot latestSnapshot2 = createFileStoreTable.snapshotManager().latestSnapshot();
        Assertions.assertThat(latestSnapshot2).isNotNull();
        Assertions.assertThat(latestSnapshot2.id()).isEqualTo(1L);
        Assertions.assertThat(createFileStoreTable.tagManager().tagCount()).isEqualTo(1L);
    }

    @Test
    public void testCalcDataBytesSend() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.initialCommitUser);
        newWrite.write(GenericRow.of(new Object[]{1, 10L}));
        newWrite.write(GenericRow.of(new Object[]{1, 20L}));
        List prepareCommit = newWrite.prepareCommit(false, 0L);
        newWrite.close();
        ManifestCommittable manifestCommittable = new ManifestCommittable(0L);
        Iterator it = prepareCommit.iterator();
        while (it.hasNext()) {
            manifestCommittable.addFileCommittable((CommitMessage) it.next());
        }
        StoreCommitter storeCommitter = new StoreCommitter(createFileStoreTable.newCommit(this.initialCommitUser), UnregisteredMetricsGroup.createOperatorMetricGroup());
        storeCommitter.commit(Collections.singletonList(manifestCommittable));
        CommitterMetrics committerMetrics = storeCommitter.getCommitterMetrics();
        Assertions.assertThat(committerMetrics.getNumBytesOutCounter().getCount()).isEqualTo(285L);
        Assertions.assertThat(committerMetrics.getNumRecordsOutCounter().getCount()).isEqualTo(2L);
        storeCommitter.close();
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    @Test
    public void testCommitMetrics() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperator<Committable, Committable> createCommitterOperator = createCommitterOperator(createFileStoreTable, null, new RestoreAndFailCommittableStateManager(() -> {
            return new VersionedSerializerWrapper(new ManifestCommittableSerializer());
        }));
        ?? createTestHarness = createTestHarness(createCommitterOperator);
        createTestHarness.open();
        StreamTableWrite newWrite = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        newWrite.write(GenericRow.of(new Object[]{1, 100L}));
        Committable.Kind kind = Committable.Kind.FILE;
        kind.processElement(new Committable(1L, kind, newWrite.prepareCommit(false, 1L).get(0)), 0L);
        long j = 0 + 1 + 1;
        createTestHarness.snapshot(1L, (long) createTestHarness);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        MetricGroup addGroup = createCommitterOperator.getMetricGroup().addGroup("paimon").addGroup("table", createFileStoreTable.name()).addGroup("commit");
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesAdded").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesDeleted").getValue()).isEqualTo(0L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesAppended").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesCommitCompacted").getValue()).isEqualTo(0L);
        newWrite.write(GenericRow.of(new Object[]{1, 101L}));
        newWrite.compact(BinaryRow.EMPTY_ROW, 0, false);
        newWrite.write(GenericRow.of(new Object[]{2, 200L}));
        newWrite.compact(BinaryRow.EMPTY_ROW, 0, true);
        Committable.Kind kind2 = Committable.Kind.FILE;
        kind2.processElement(new Committable(2L, kind2, newWrite.prepareCommit(true, 2L).get(0)), j);
        long j2 = j + 1 + 1;
        createTestHarness.snapshot(2L, (long) createTestHarness);
        createTestHarness.notifyOfCompletedCheckpoint(2L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesAdded").getValue()).isEqualTo(3L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesDeleted").getValue()).isEqualTo(3L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesAppended").getValue()).isEqualTo(2L);
        Assertions.assertThat(TestingMetricUtils.getGauge(addGroup, "lastTableFilesCommitCompacted").getValue()).isEqualTo(4L);
        createTestHarness.close();
        newWrite.close();
    }

    @Test
    public void testParallelism() throws Exception {
        OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness = createTestHarness(createCommitterOperator(createFileStoreTable(), UUID.randomUUID().toString(), new NoopCommittableStateManager()), 10, 10, 3);
        Throwable th = null;
        try {
            try {
                createTestHarness.getClass();
                Assertions.assertThatCode(createTestHarness::open).hasMessage("Committer Operator parallelism in paimon MUST be one.");
                if (createTestHarness != null) {
                    if (0 == 0) {
                        createTestHarness.close();
                        return;
                    }
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th4;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness(FileStoreTable fileStoreTable) throws Exception {
        return createTestHarness(createCommitterOperator(fileStoreTable, null, new RestoreAndFailCommittableStateManager(() -> {
            return new VersionedSerializerWrapper(new ManifestCommittableSerializer());
        })));
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness(FileStoreTable fileStoreTable) throws Exception {
        return createLossyTestHarness(fileStoreTable, null);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness(FileStoreTable fileStoreTable, String str) throws Exception {
        return createTestHarness(createCommitterOperator(fileStoreTable, str, new NoopCommittableStateManager()));
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(OneInputStreamOperator<Committable, Committable> oneInputStreamOperator) throws Exception {
        return createTestHarness(oneInputStreamOperator, 1, 1, 0);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(OneInputStreamOperator<Committable, Committable> oneInputStreamOperator, int i, int i2, int i3) throws Exception {
        TypeSerializer createSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Committable, Committable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(oneInputStreamOperator, i, i2, i3, createSerializer, new OperatorID());
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable fileStoreTable, String str, CommittableStateManager<ManifestCommittable> committableStateManager) {
        return new CommitterOperator(true, true, str == null ? this.initialCommitUser : str, (str2, operatorMetricGroup) -> {
            return new StoreCommitter(fileStoreTable.newStreamWriteBuilder().withCommitUser(str2).newCommit(), operatorMetricGroup);
        }, committableStateManager);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable fileStoreTable, String str, CommittableStateManager<ManifestCommittable> committableStateManager, final ThrowingConsumer<StateInitializationContext, Exception> throwingConsumer) {
        return new CommitterOperator<Committable, ManifestCommittable>(true, true, str == null ? this.initialCommitUser : str, (str2, operatorMetricGroup) -> {
            return new StoreCommitter(fileStoreTable.newStreamWriteBuilder().withCommitUser(str2).newCommit(), operatorMetricGroup);
        }, committableStateManager) { // from class: org.apache.paimon.flink.sink.CommitterOperatorTest.1
            public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
                throwingConsumer.accept(stateInitializationContext);
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1887854292:
                if (implMethodName.equals("lambda$createCommitterOperator$d5941ce7$1")) {
                    z = true;
                    break;
                }
                break;
            case -1258595838:
                if (implMethodName.equals("lambda$createRecoverableTestHarness$baae714a$1")) {
                    z = 3;
                    break;
                }
                break;
            case -406076962:
                if (implMethodName.equals("lambda$createCommitterOperator$acec1c3c$1")) {
                    z = false;
                    break;
                }
                break;
            case 1434508584:
                if (implMethodName.equals("lambda$testCommitMetrics$35ae62d6$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/Committer$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;")) {
                    FileStoreTable fileStoreTable = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return (str2, operatorMetricGroup) -> {
                        return new StoreCommitter(fileStoreTable.newStreamWriteBuilder().withCommitUser(str2).newCommit(), operatorMetricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/Committer$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/flink/metrics/groups/OperatorMetricGroup;)Lorg/apache/paimon/flink/sink/Committer;")) {
                    FileStoreTable fileStoreTable2 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return (str22, operatorMetricGroup2) -> {
                        return new StoreCommitter(fileStoreTable2.newStreamWriteBuilder().withCommitUser(str22).newCommit(), operatorMetricGroup2);
                    };
                }
                break;
            case ReadWriteTableTestUtil.DEFAULT_PARALLELISM /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    return () -> {
                        return new VersionedSerializerWrapper(new ManifestCommittableSerializer());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    return () -> {
                        return new VersionedSerializerWrapper(new ManifestCommittableSerializer());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
