package org.apache.flink.table.store.connector.sink;

import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/CommitterOperatorTest.class */
public class CommitterOperatorTest extends CommitterOperatorTestBase {
    private String initialCommitUser;

    @Override // org.apache.flink.table.store.connector.sink.CommitterOperatorTestBase
    @BeforeEach
    public void before() {
        super.before();
        this.initialCommitUser = UUID.randomUUID().toString();
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness, long] */
    @Test
    public void testFailIntentionallyAfterRestore() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        ?? createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        TableWrite newWrite = createFileStoreTable.newWrite(this.initialCommitUser);
        newWrite.write(GenericRowData.of(new Object[]{1, 10L}));
        newWrite.write(GenericRowData.of(new Object[]{2, 20L}));
        long j = 1;
        for (FileCommittable fileCommittable : newWrite.prepareCommit(false, 8L)) {
            Committable.Kind kind = Committable.Kind.FILE;
            long j2 = j;
            j = j2 + 1;
            kind.processElement(new Committable(8L, kind, fileCommittable), j2);
        }
        long j3 = j + 1;
        OperatorSubtaskState snapshot = createRecoverableTestHarness.snapshot(0L, (long) createRecoverableTestHarness);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshotId()).isNull();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness2 = createRecoverableTestHarness(createFileStoreTable);
        try {
            createRecoverableTestHarness2.initializeState(snapshot);
            createRecoverableTestHarness2.open();
            org.junit.jupiter.api.Assertions.fail("Expecting intentional exception");
        } catch (Exception e) {
            Assertions.assertThat(e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        assertResults(createFileStoreTable, "1, 10", "2, 20");
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness3 = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness3.initializeState(snapshot);
        createRecoverableTestHarness3.open();
        assertResults(createFileStoreTable, "1, 10", "2, 20");
    }

    @Test
    public void testCheckpointAbort() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness = createRecoverableTestHarness(createFileStoreTable);
        createRecoverableTestHarness.open();
        long j = 0;
        for (int i = 0; i < 10; i++) {
            j++;
            TableWrite newWrite = createFileStoreTable.newWrite(this.initialCommitUser);
            newWrite.write(GenericRowData.of(new Object[]{1, 10L}));
            newWrite.write(GenericRowData.of(new Object[]{2, 20L}));
            Iterator it = newWrite.prepareCommit(false, j).iterator();
            while (it.hasNext()) {
                createRecoverableTestHarness.processElement(new Committable(j, Committable.Kind.FILE, (FileCommittable) it.next()), 1L);
            }
        }
        createRecoverableTestHarness.snapshot(j, 1L);
        createRecoverableTestHarness.notifyOfCompletedCheckpoint(j);
        Assertions.assertThat(new SnapshotManager(this.tablePath).latestSnapshotId()).isEqualTo(j);
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness, long] */
    /* JADX WARN: Type inference failed for: r0v34, types: [org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness, long] */
    @Test
    public void testSnapshotLostWhenFailed() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        ?? createLossyTestHarness = createLossyTestHarness(createFileStoreTable);
        createLossyTestHarness.open();
        long j = 1;
        TableWrite newWrite = createFileStoreTable.newWrite(this.initialCommitUser);
        newWrite.write(GenericRowData.of(new Object[]{1, 10L}));
        newWrite.write(GenericRowData.of(new Object[]{2, 20L}));
        for (FileCommittable fileCommittable : newWrite.prepareCommit(false, 1L)) {
            Committable.Kind kind = Committable.Kind.FILE;
            long j2 = j;
            j = j2 + 1;
            kind.processElement(new Committable(1L, kind, fileCommittable), j2);
        }
        long j3 = j + 1;
        createLossyTestHarness.snapshot(1L, (long) createLossyTestHarness);
        createLossyTestHarness.notifyOfCompletedCheckpoint(1L);
        newWrite.write(GenericRowData.of(new Object[]{3, 30L}));
        newWrite.write(GenericRowData.of(new Object[]{4, 40L}));
        for (FileCommittable fileCommittable2 : newWrite.prepareCommit(false, 2L)) {
            Committable.Kind kind2 = Committable.Kind.FILE;
            long j4 = j3;
            j3 = j4 + 1;
            kind2.processElement(new Committable(2L, kind2, fileCommittable2), j4);
        }
        long j5 = j3 + 1;
        OperatorSubtaskState snapshot = createLossyTestHarness.snapshot(2L, (long) createLossyTestHarness);
        newWrite.close();
        createLossyTestHarness.close();
        ?? createLossyTestHarness2 = createLossyTestHarness(createFileStoreTable);
        createLossyTestHarness2.initializeState(snapshot);
        createLossyTestHarness2.open();
        TableWrite newWrite2 = createFileStoreTable.newWrite(this.initialCommitUser);
        newWrite2.write(GenericRowData.of(new Object[]{5, 50L}));
        newWrite2.write(GenericRowData.of(new Object[]{6, 60L}));
        for (FileCommittable fileCommittable3 : newWrite2.prepareCommit(false, 3L)) {
            Committable.Kind kind3 = Committable.Kind.FILE;
            long j6 = j5;
            j5 = j6 + 1;
            kind3.processElement(new Committable(3L, kind3, fileCommittable3), j6);
        }
        long j7 = j5 + 1;
        createLossyTestHarness2.snapshot(3L, (long) createLossyTestHarness2);
        createLossyTestHarness2.notifyOfCompletedCheckpoint(3L);
        newWrite2.close();
        createLossyTestHarness2.close();
        assertResults(createFileStoreTable, "1, 10", "2, 20", "5, 50", "6, 60");
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness(FileStoreTable fileStoreTable) throws Exception {
        return createTestHarness(new CommitterOperator(true, this.initialCommitUser, str -> {
            return new StoreCommitter(fileStoreTable.newCommit(str));
        }, new RestoreAndFailCommittableStateManager(ManifestCommittableSerializer::new)));
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness(FileStoreTable fileStoreTable) throws Exception {
        return createTestHarness(new CommitterOperator(true, this.initialCommitUser, str -> {
            return new StoreCommitter(fileStoreTable.newCommit(str));
        }, new NoopCommittableStateManager()));
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(CommitterOperator committerOperator) throws Exception {
        TypeSerializer createSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Committable, Committable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(committerOperator, createSerializer);
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -977840469:
                if (implMethodName.equals("lambda$createLossyTestHarness$43475064$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1678901875:
                if (implMethodName.equals("lambda$createRecoverableTestHarness$43475064$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/store/table/FileStoreTable;Ljava/lang/String;)Lorg/apache/flink/table/store/connector/sink/Committer;")) {
                    FileStoreTable fileStoreTable = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return str -> {
                        return new StoreCommitter(fileStoreTable.newCommit(str));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return ManifestCommittableSerializer::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/store/table/FileStoreTable;Ljava/lang/String;)Lorg/apache/flink/table/store/connector/sink/Committer;")) {
                    FileStoreTable fileStoreTable2 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return str2 -> {
                        return new StoreCommitter(fileStoreTable2.newCommit(str2));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
