package org.apache.flink.table.store.connector.sink;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.sink.TestFileStore;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreSinkTest.class */
public class StoreSinkTest {
    private final boolean hasPk;
    private final boolean partitioned;
    private final ObjectIdentifier identifier = ObjectIdentifier.of("my_catalog", "my_database", "my_table");
    private final TestLock lock = new TestLock();
    private final RowType rowType = RowType.of(new LogicalType[]{new IntType(), new IntType(), new IntType()});
    private TestFileStore fileStore;
    private int[] primaryKeys;
    private int[] partitions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreSinkTest$TestLock.class */
    public class TestLock implements CatalogLock {
        private boolean locked;
        private boolean closed;

        private TestLock() {
            this.locked = false;
            this.closed = false;
        }

        public <T> T runWithLock(String str, String str2, Callable<T> callable) throws Exception {
            Assertions.assertThat(str).isEqualTo(StoreSinkTest.this.identifier.getDatabaseName());
            Assertions.assertThat(str2).isEqualTo(StoreSinkTest.this.identifier.getObjectName());
            this.locked = true;
            return callable.call();
        }

        public void close() {
            this.closed = true;
        }
    }

    public StoreSinkTest(boolean z, boolean z2) {
        this.hasPk = z;
        this.partitioned = z2;
    }

    @Before
    public void before() {
        this.primaryKeys = this.hasPk ? new int[]{1} : new int[0];
        this.partitions = this.partitioned ? new int[]{0} : new int[0];
        this.fileStore = new TestFileStore(this.hasPk, this.hasPk ? RowType.of(new LogicalType[]{new IntType()}) : this.rowType, this.hasPk ? this.rowType : new RowType(Collections.singletonList(new RowType.RowField("COUNT", new BigIntType(false)))), this.partitioned ? RowType.of(new LogicalType[]{new IntType()}) : RowType.of(new LogicalType[0]));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
    public static List<Boolean[]> data() {
        return Arrays.asList(new Boolean[]{true, true}, new Boolean[]{true, false}, new Boolean[]{false, false}, new Boolean[]{false, true});
    }

    @Test
    public void testChangelogs() throws Exception {
        Assume.assumeTrue(this.hasPk && this.partitioned);
        writeAndCommit(newSink(null), GenericRowData.ofKind(RowKind.INSERT, new Object[]{0, 0, 1}), GenericRowData.ofKind(RowKind.UPDATE_BEFORE, new Object[]{0, 2, 3}), GenericRowData.ofKind(RowKind.UPDATE_AFTER, new Object[]{0, 7, 5}), GenericRowData.ofKind(RowKind.DELETE, new Object[]{1, 0, 1}));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(1)).isEqualTo(Collections.singletonList("DELETE-key-0-value-1/0/1"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(0)).isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(1)).isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
    }

    @Test
    public void testNoKeyChangelogs() throws Exception {
        Assume.assumeTrue(!this.hasPk && this.partitioned);
        writeAndCommit(new StoreSink<>(this.identifier, this.fileStore, this.partitions, this.primaryKeys, this.primaryKeys, 2, () -> {
            return this.lock;
        }, new HashMap(), (LogSinkProvider) null), GenericRowData.ofKind(RowKind.INSERT, new Object[]{0, 0, 1}), GenericRowData.ofKind(RowKind.UPDATE_BEFORE, new Object[]{0, 2, 3}), GenericRowData.ofKind(RowKind.UPDATE_AFTER, new Object[]{0, 4, 5}), GenericRowData.ofKind(RowKind.DELETE, new Object[]{1, 0, 1}));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(0)).isEqualTo(Collections.singletonList("ADD-key-1/0/1-value--1"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(0)).isEqualTo(Collections.singletonList("ADD-key-0/4/5-value-1"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(1)).isEqualTo(Arrays.asList("ADD-key-0/0/1-value-1", "ADD-key-0/2/3-value--1"));
    }

    @Test
    public void testAppend() throws Exception {
        Assume.assumeTrue(this.hasPk && this.partitioned);
        StoreSink<?, ?> newSink = newSink(null);
        writeAndAssert(newSink);
        writeAndCommit(newSink, GenericRowData.of(new Object[]{0, 8, 9}), GenericRowData.of(new Object[]{1, 10, 11}));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(0)).isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(0)).isEqualTo(Arrays.asList("ADD-key-2-value-0/2/3", "ADD-key-8-value-0/8/9"));
    }

    @Test
    public void testOverwrite() throws Exception {
        Assume.assumeTrue(this.hasPk && this.partitioned);
        StoreSink<?, ?> newSink = newSink(new HashMap());
        writeAndAssert(newSink);
        writeAndCommit(newSink, GenericRowData.of(new Object[]{0, 8, 9}), GenericRowData.of(new Object[]{1, 10, 11}));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(1)).isNull();
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(0)).isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(0)).isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
    }

    @Test
    public void testOverwritePartition() throws Exception {
        Assume.assumeTrue(this.hasPk && this.partitioned);
        HashMap hashMap = new HashMap();
        hashMap.put("part", "0");
        StoreSink<?, ?> newSink = newSink(hashMap);
        writeAndAssert(newSink);
        writeAndCommit(newSink, GenericRowData.of(new Object[]{0, 8, 9}), GenericRowData.of(new Object[]{1, 10, 11}));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(1)).isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(0)).isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(0)).isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
    }

    private void writeAndAssert(StoreSink<?, ?> storeSink) throws Exception {
        writeAndCommit(storeSink, GenericRowData.of(new Object[]{0, 0, 1}), GenericRowData.of(new Object[]{0, 2, 3}), GenericRowData.of(new Object[]{0, 7, 5}), GenericRowData.of(new Object[]{1, 0, 1}));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(1)).get(1)).isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(0)).isEqualTo(Collections.singletonList("ADD-key-2-value-0/2/3"));
        Assertions.assertThat(this.fileStore.committedFiles.get(CompactManagerTest.row(0)).get(1)).isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
    }

    private void writeAndCommit(StoreSink<?, ?> storeSink, RowData... rowDataArr) throws Exception {
        commit(storeSink, write(storeSink, rowDataArr));
    }

    private List<Committable> write(StoreSink<?, ?> storeSink, RowData... rowDataArr) throws Exception {
        StoreSinkWriter createWriter = storeSink.createWriter((Sink.InitContext) null);
        for (RowData rowData : rowDataArr) {
            createWriter.write(rowData, (SinkWriter.Context) null);
        }
        List<Committable> prepareCommit = createWriter.prepareCommit();
        HashMap hashMap = new HashMap(createWriter.writers());
        Assertions.assertThat(hashMap.size()).isGreaterThan(0);
        createWriter.close();
        hashMap.forEach((binaryRowData, map) -> {
            map.forEach((num, recordWriter) -> {
                TestFileStore.TestRecordWriter testRecordWriter = (TestFileStore.TestRecordWriter) recordWriter;
                Assertions.assertThat(testRecordWriter.synced).isTrue();
                Assertions.assertThat(testRecordWriter.closed).isTrue();
            });
        });
        return prepareCommit;
    }

    private void commit(StoreSink<?, ?> storeSink, List<Committable> list) throws Exception {
        StoreGlobalCommitter createGlobalCommitter = storeSink.createGlobalCommitter();
        ManifestCommittable combine = createGlobalCommitter.combine(0L, list);
        this.fileStore.expired = false;
        this.lock.locked = false;
        createGlobalCommitter.commit(Collections.singletonList(combine));
        Assertions.assertThat(this.fileStore.expired).isTrue();
        Assertions.assertThat(this.lock.locked).isTrue();
        Assertions.assertThat(createGlobalCommitter.filterRecoveredCommittables(Collections.singletonList(combine)).size()).isEqualTo(0);
        this.lock.closed = false;
        createGlobalCommitter.close();
        Assertions.assertThat(this.lock.closed).isTrue();
    }

    private StoreSink<?, ?> newSink(Map<String, String> map) {
        return new StoreSink<>(this.identifier, this.fileStore, this.partitions, this.primaryKeys, this.primaryKeys, 2, () -> {
            return this.lock;
        }, map, (LogSinkProvider) null);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 171004097:
                if (implMethodName.equals("lambda$testNoKeyChangelogs$cba113c0$1")) {
                    z = false;
                    break;
                }
                break;
            case 1653134302:
                if (implMethodName.equals("lambda$newSink$84a893b8$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/table/catalog/CatalogLock$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/flink/table/catalog/CatalogLock;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/StoreSinkTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/table/catalog/CatalogLock;")) {
                    StoreSinkTest storeSinkTest = (StoreSinkTest) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return this.lock;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/table/catalog/CatalogLock$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/flink/table/catalog/CatalogLock;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/StoreSinkTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/table/catalog/CatalogLock;")) {
                    StoreSinkTest storeSinkTest2 = (StoreSinkTest) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return this.lock;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
