package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.WriteMode;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/paimon/flink/action/DeleteActionITCase.class */
public class DeleteActionITCase extends ActionITCaseBase {
    private static final DataType[] FIELD_TYPES = {DataTypes.BIGINT(), DataTypes.STRING()};
    private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[]{"k", "v"});

    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init(this.warehouse);
    }

    @MethodSource({"data"})
    @ParameterizedTest(name = "hasPk-{0}")
    public void testDeleteAction(boolean z, List<Row> list, List<Row> list2) throws Exception {
        prepareTable(z);
        DeleteAction deleteAction = new DeleteAction(this.warehouse, this.database, this.tableName, "k = 1");
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(this.tableName), list);
        deleteAction.run();
        Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat(snapshot.id()).isEqualTo(2L);
        Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, list2);
        testStreamingRead.close();
    }

    @Test
    public void testWorkWithPartialUpdateTable() throws Exception {
        createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"k", "a", "b"}), Collections.emptyList(), Collections.singletonList("k"), new HashMap<String, String>() { // from class: org.apache.paimon.flink.action.DeleteActionITCase.1
            {
                put(CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
                put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true");
                put(CoreOptions.CHANGELOG_PRODUCER.key(), ThreadLocalRandom.current().nextBoolean() ? CoreOptions.ChangelogProducer.LOOKUP.toString() : CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
            }
        });
        DeleteAction deleteAction = new DeleteAction(this.warehouse, this.database, this.tableName, "k < 3");
        ReadWriteTableTestUtil.insertInto(this.tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'), (4, 'Paimon', 'D')");
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(this.tableName), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "Say", "A"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "Hi", "B"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3, "To", "C"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, "Paimon", "D"})));
        deleteAction.run();
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Arrays.asList(TestValuesTableFactory.changelogRow("-D", new Object[]{1, "Say", "A"}), TestValuesTableFactory.changelogRow("-D", new Object[]{2, "Hi", "B"})));
        ReadWriteTableTestUtil.insertInto(this.tableName, "(4, CAST (NULL AS STRING), '$')", "(4, 'Test', CAST (NULL AS STRING))");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Arrays.asList(TestValuesTableFactory.changelogRow("-U", new Object[]{4, "Paimon", "D"}), TestValuesTableFactory.changelogRow("+U", new Object[]{4, "Test", "$"})));
        testStreamingRead.close();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(this.tableName), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{3, "To", "C"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, "Test", "$"})));
    }

    private void prepareTable(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
        FileStoreTable createFileStoreTable = createFileStoreTable(ROW_TYPE, Collections.emptyList(), z ? Collections.singletonList("k") : Collections.emptyList(), hashMap);
        this.snapshotManager = createFileStoreTable.snapshotManager();
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = withCommitUser.newWrite();
        this.commit = withCommitUser.newCommit();
        writeData(rowData(1L, BinaryString.fromString("Hi")), rowData(1L, BinaryString.fromString("Hello")), rowData(1L, BinaryString.fromString("World")), rowData(2L, BinaryString.fromString("Flink")), rowData(2L, BinaryString.fromString("Table")), rowData(2L, BinaryString.fromString("Store")), rowData(3L, BinaryString.fromString("Developer")));
        Snapshot snapshot = this.snapshotManager.snapshot(this.snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat(snapshot.id()).isEqualTo(1L);
        Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
    }

    private static List<Arguments> data() {
        return Arrays.asList(Arguments.arguments(new Object[]{true, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "World"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "Store"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Developer"})), Collections.singletonList(TestValuesTableFactory.changelogRow("-D", new Object[]{1L, "World"}))}), Arguments.arguments(new Object[]{false, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "Hi"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "Hello"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "World"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "Flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "Table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "Store"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Developer"})), Arrays.asList(TestValuesTableFactory.changelogRow("-D", new Object[]{1L, "Hi"}), TestValuesTableFactory.changelogRow("-D", new Object[]{1L, "Hello"}), TestValuesTableFactory.changelogRow("-D", new Object[]{1L, "World"}))}));
    }
}
