package org.apache.flink.table.store.connector.sink;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/CommitterOperatorTest.class */
public class CommitterOperatorTest {
    private static final RowType ROW_TYPE = RowType.of(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.BIGINT().getLogicalType()}, new String[]{"a", "b"});

    @TempDir
    public Path tempDir;
    private org.apache.flink.core.fs.Path tablePath;

    @BeforeEach
    public void before() {
        this.tablePath = new org.apache.flink.core.fs.Path(this.tempDir.toString());
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [long, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    @Test
    public void testFailIntentionallyAfterRestore() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        ?? createTestHarness = createTestHarness(createFileStoreTable);
        createTestHarness.open();
        TableWrite newWrite = createFileStoreTable.newWrite();
        newWrite.write(GenericRowData.of(new Object[]{1, 10L}));
        newWrite.write(GenericRowData.of(new Object[]{2, 20L}));
        long j = 1;
        for (FileCommittable fileCommittable : newWrite.prepareCommit(false)) {
            Committable.Kind kind = Committable.Kind.FILE;
            long j2 = j;
            j = j2 + 1;
            kind.processElement(new Committable(8L, kind, fileCommittable), j2);
        }
        long j3 = j + 1;
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, (long) createTestHarness);
        Assertions.assertThat(createFileStoreTable.snapshotManager().latestSnapshotId()).isNull();
        OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness2 = createTestHarness(createFileStoreTable);
        try {
            createTestHarness2.initializeState(snapshot);
            createTestHarness2.open();
            org.junit.jupiter.api.Assertions.fail("Expecting intentional exception");
        } catch (Exception e) {
            Assertions.assertThat(e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        assertResults(createFileStoreTable, "1, 10", "2, 20");
        OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness3 = createTestHarness(createFileStoreTable);
        createTestHarness3.initializeState(snapshot);
        createTestHarness3.open();
        assertResults(createFileStoreTable, "1, 10", "2, 20");
    }

    @Test
    public void testCheckpointAbort() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness = createTestHarness(createFileStoreTable);
        createTestHarness.open();
        long j = 0;
        for (int i = 0; i < 10; i++) {
            j++;
            TableWrite newWrite = createFileStoreTable.newWrite();
            newWrite.write(GenericRowData.of(new Object[]{1, 10L}));
            newWrite.write(GenericRowData.of(new Object[]{2, 20L}));
            Iterator it = newWrite.prepareCommit(false).iterator();
            while (it.hasNext()) {
                createTestHarness.processElement(new Committable(j, Committable.Kind.FILE, (FileCommittable) it.next()), 1L);
            }
        }
        createTestHarness.snapshot(j, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(j);
        Assertions.assertThat(new SnapshotManager(this.tablePath).latestSnapshotId()).isEqualTo(j);
    }

    private void assertResults(FileStoreTable fileStoreTable, String... strArr) {
        TableRead newRead = fileStoreTable.newRead();
        ArrayList arrayList = new ArrayList();
        fileStoreTable.newScan().plan().splits.forEach(split -> {
            try {
                RecordReaderIterator recordReaderIterator = new RecordReaderIterator(newRead.createReader(split));
                while (recordReaderIterator.hasNext()) {
                    RowData rowData = (RowData) recordReaderIterator.next();
                    arrayList.add(rowData.getInt(0) + ", " + rowData.getLong(1));
                }
                recordReaderIterator.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Collections.sort(arrayList);
        Assertions.assertThat(arrayList).isEqualTo(Arrays.asList(strArr));
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.PATH, this.tablePath.toString());
        new SchemaManager(this.tablePath).commitNewVersion(new UpdateSchema(ROW_TYPE, Collections.emptyList(), Collections.emptyList(), configuration.toMap(), ""));
        return FileStoreTableFactory.create(configuration);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(FileStoreTable fileStoreTable) throws Exception {
        CommitterOperator committerOperator = new CommitterOperator(true, str -> {
            return new StoreCommitter(fileStoreTable.newCommit(str));
        }, ManifestCommittableSerializer::new);
        TypeSerializer createSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Committable, Committable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(committerOperator, createSerializer);
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1420482919:
                if (implMethodName.equals("lambda$createTestHarness$43475064$1")) {
                    z = true;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return ManifestCommittableSerializer::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/CommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/store/table/FileStoreTable;Ljava/lang/String;)Lorg/apache/flink/table/store/connector/sink/Committer;")) {
                    FileStoreTable fileStoreTable = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return str -> {
                        return new StoreCommitter(fileStoreTable.newCommit(str));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
