package org.apache.paimon.flink.sink;

import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.SortedMap;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.class */
public class AutoTagForSavepointCommitterOperatorTest extends CommitterOperatorTest {
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [long, org.apache.paimon.table.sink.StreamTableWrite] */
    @Test
    public void testAutoTagForSavepoint() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        ?? newWrite = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        processCommittable(createRecoverableTestHarness, newWrite, 1L, 1L, GenericRow.of(new Object[]{1, 10L}));
        createRecoverableTestHarness.snapshotWithLocalState(1L, 1L, CheckpointType.CHECKPOINT);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshotId()).isNull();
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(createFileStoreTable.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(createFileStoreTable.tagManager().tagCount()).isEqualTo(0L);
        processCommittable(createRecoverableTestHarness, newWrite, 1 + 1, 1 + 1, GenericRow.of(new Object[]{2, 20L}));
        createRecoverableTestHarness.snapshotWithLocalState((long) newWrite, (long) newWrite, SavepointType.savepoint(SavepointFormatType.CANONICAL));
        Assertions.assertThat(createFileStoreTable.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(createFileStoreTable.tagManager().tagCount()).isEqualTo(0L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint((long) newWrite);
        Snapshot latestSnapshot = createFileStoreTable.snapshotManager().latestSnapshot();
        Assertions.assertThat(latestSnapshot).isNotNull();
        Assertions.assertThat(latestSnapshot.id()).isEqualTo((long) newWrite);
        SortedMap tags = createFileStoreTable.tagManager().tags();
        Assertions.assertThat(tags).containsOnlyKeys(new Snapshot[]{latestSnapshot});
        Assertions.assertThat((String) tags.get(latestSnapshot)).isEqualTo("savepoint-" + ((long) newWrite));
    }

    @Test
    public void testRestore() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        processCommittable(createRecoverableTestHarness, createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite(), 1L, 1L, GenericRow.of(new Object[]{1, 10L}));
        OperatorSubtaskState jobManagerOwnedState = createRecoverableTestHarness.snapshotWithLocalState(1L, 1L, SavepointType.savepoint(SavepointFormatType.CANONICAL)).getJobManagerOwnedState();
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshot()).isNull();
        Assertions.assertThat(createFileStoreTable.tagManager().tagCount()).isEqualTo(0L);
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness2 = createRecoverableTestHarness(createFileStoreTable);
        try {
            createRecoverableTestHarness2.initializeState(jobManagerOwnedState);
            createRecoverableTestHarness2.open();
            Assertions.fail("Expecting intentional exception");
        } catch (Exception e) {
            Assertions.assertThat(e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        Snapshot latestSnapshot = createFileStoreTable.snapshotManager().latestSnapshot();
        Assertions.assertThat(latestSnapshot).isNotNull();
        Assertions.assertThat(latestSnapshot.id()).isEqualTo(1L);
        SortedMap tags = createFileStoreTable.tagManager().tags();
        Assertions.assertThat(tags).containsOnlyKeys(new Snapshot[]{latestSnapshot});
        Assertions.assertThat((String) tags.get(latestSnapshot)).isEqualTo("savepoint-1");
    }

    private void processCommittable(OneInputStreamOperatorTestHarness<Committable, Committable> oneInputStreamOperatorTestHarness, StreamTableWrite streamTableWrite, long j, long j2, InternalRow... internalRowArr) throws Exception {
        for (InternalRow internalRow : internalRowArr) {
            streamTableWrite.write(internalRow);
        }
        Iterator it = streamTableWrite.prepareCommit(false, j).iterator();
        while (it.hasNext()) {
            oneInputStreamOperatorTestHarness.processElement(new Committable(j, Committable.Kind.FILE, (CommitMessage) it.next()), j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.sink.CommitterOperatorTest
    public OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable fileStoreTable, String str, CommittableStateManager<ManifestCommittable> committableStateManager) {
        CommitterOperator createCommitterOperator = super.createCommitterOperator(fileStoreTable, str, committableStateManager);
        fileStoreTable.getClass();
        SerializableSupplier serializableSupplier = fileStoreTable::snapshotManager;
        fileStoreTable.getClass();
        return new AutoTagForSavepointCommitterOperator(createCommitterOperator, serializableSupplier, fileStoreTable::tagManager);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.sink.CommitterOperatorTest
    public OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable fileStoreTable, String str, CommittableStateManager<ManifestCommittable> committableStateManager, ThrowingConsumer<StateInitializationContext, Exception> throwingConsumer) {
        CommitterOperator createCommitterOperator = super.createCommitterOperator(fileStoreTable, str, committableStateManager, throwingConsumer);
        fileStoreTable.getClass();
        SerializableSupplier serializableSupplier = fileStoreTable::snapshotManager;
        fileStoreTable.getClass();
        return new AutoTagForSavepointCommitterOperator(createCommitterOperator, serializableSupplier, fileStoreTable::tagManager);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 964922697:
                if (implMethodName.equals("snapshotManager")) {
                    z = true;
                    break;
                }
                break;
            case 1070597811:
                if (implMethodName.equals("tagManager")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/table/DataTable") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/utils/TagManager;")) {
                    FileStoreTable fileStoreTable = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return fileStoreTable::tagManager;
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/table/DataTable") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/utils/TagManager;")) {
                    FileStoreTable fileStoreTable2 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return fileStoreTable2::tagManager;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/table/DataTable") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/utils/SnapshotManager;")) {
                    FileStoreTable fileStoreTable3 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return fileStoreTable3::snapshotManager;
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/table/DataTable") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/utils/SnapshotManager;")) {
                    FileStoreTable fileStoreTable4 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return fileStoreTable4::snapshotManager;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
