package org.apache.paimon.flink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.CommonTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/PartialUpdateITCase.class */
public class PartialUpdateITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update');", "CREATE TABLE IF NOT EXISTS dwd_orders (OrderID INT, OrderNumber INT, PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'ignore-delete'='true');", "CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT, OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');", "CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');");
    }

    @Test
    public void testMergeInMemory() {
        batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), '5'), (1, 2, CAST(NULL AS INT), 6, CAST(NULL AS STRING))", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, 6, "5"})});
    }

    @Test
    public void testMergeRead() {
        batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 4, 5, "6"})});
        Assertions.assertThat(batchSql("SELECT a FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{4})});
    }

    @Test
    public void testMergeCompaction() {
        batchSql("ALTER TABLE T SET ('commit.force-compact'='true')", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1, '1')", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4, CAST(NULL AS STRING))", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 4, 5, "6"}), Row.of(new Object[]{1, 3, 2, 4, "1"})});
    }

    @Test
    public void testForeignKeyJoin() throws Exception {
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, ExecutionConfigOptions.UpsertMaterialize.NONE);
        CloseableIterator<Row> streamSqlIter = streamSqlIter("INSERT INTO dwd_orders SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders UNION ALL SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;", new Object[0]);
        batchSql("INSERT INTO ods_orders VALUES (1, 2, 3)", new Object[0]);
        batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'jon', 23)", new Object[0]);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(rowsToList(batchSql("SELECT * FROM dwd_orders", new Object[0])).contains(Arrays.asList(1, 2, 3, "snow", "jon", 23)));
        }, Duration.ofSeconds(5L), Duration.ofMillis(200L));
        batchSql("INSERT INTO ods_orders VALUES (1, 4, 3)", new Object[0]);
        batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'targaryen', 23)", new Object[0]);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(rowsToList(batchSql("SELECT * FROM dwd_orders", new Object[0])).contains(Arrays.asList(1, 4, 3, "snow", "targaryen", 23)));
        }, Duration.ofSeconds(5L), Duration.ofMillis(200L));
        streamSqlIter.close();
    }

    private List<List<Object>> rowsToList(List<Row> list) {
        return (List) list.stream().map(this::toList).collect(Collectors.toList());
    }

    private List<Object> toList(Row row) {
        Assertions.assertThat(row.getKind()).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < row.getArity(); i++) {
            arrayList.add(row.getField(i));
        }
        return arrayList;
    }

    @Test
    public void testStreamingRead() {
        Assertions.assertThatThrownBy(() -> {
            this.sEnv.from("T").execute().print();
        }, "Partial update continuous reading is not supported", new Object[0]);
    }

    @Test
    public void testStreamingReadChangelogInput() throws TimeoutException {
        sql("CREATE TABLE INPUT_T (a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'changelog-producer'='input');", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM INPUT_T", new Object[0]));
        sql("INSERT INTO INPUT_T VALUES (1, CAST(NULL AS INT), 1)", new Object[0]);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, 1})});
        sql("INSERT INTO INPUT_T VALUES (1, 1, CAST(NULL AS INT)), (2, 2, 2)", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1, null}), Row.of(new Object[]{2, 2, 2})});
    }

    @Test
    public void testSequenceGroup() {
        sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, 2, 1, 1, 1})});
        Assertions.assertThat(sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1})});
        sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, 2, 3, 3, 3})});
        sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)", new Object[0]);
        Assertions.assertThat(sql("SELECT a, b FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{4, 4})});
        Assertions.assertThat(sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{5, null})});
    }

    @Test
    public void testInvalidSequenceGroup() {
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_0.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0]);
        }).hasRootCauseMessage("Field g_0 can not be found in table schema.");
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a1,b', 'fields.g_2.sequence-group'='c,d');", new Object[0]);
        }).hasRootCauseMessage("Field a1 can not be found in table schema.");
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='a,d');", new Object[0]);
        }).hasRootCauseMessage("Field a is defined repeatedly by multiple groups: [g_1, g_2].");
    }

    @Test
    public void testProjectPushDownWithLookupChangelogProducer() {
        sql("CREATE TABLE IF NOT EXISTS T_P (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'changelog-producer' = 'lookup', 'fields.a.sequence-group'='j', 'fields.b.sequence-group'='c');", new Object[0]);
        batchSql("INSERT INTO T_P VALUES (1, 1, 1, 1, '1')", new Object[0]);
        Assertions.assertThat(sql("SELECT k, c FROM T_P", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "1"})});
    }

    @Test
    public void testLocalMerge() {
        sql("CREATE TABLE T1 (k INT,v INT,d INT,PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)  WITH ('merge-engine'='partial-update', 'local-merge-buffer-size'='1m');", new Object[0]);
        sql("INSERT INTO T1 VALUES (1, CAST(NULL AS INT), 1), (2, 1, 1), (1, 2, 1)", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 1}), Row.of(new Object[]{2, 1, 1})});
    }

    @Test
    public void testPartialUpdateWithAggregation() {
        sql("CREATE TABLE AGG (k INT, a INT, b INT, g_1 INT, c VARCHAR, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.a.aggregate-function'='sum', 'fields.g_1.sequence-group'='a', 'fields.g_2.sequence-group'='c');", new Object[0]);
        sql("INSERT INTO AGG VALUES (1, 1, 1, 1, '1', 1)", new Object[0]);
        sql("INSERT INTO AGG VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 3, 2, 2, "1", 1})});
        Assertions.assertThat(sql("SELECT a, c FROM AGG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{3, "1"})});
        sql("INSERT INTO AGG VALUES (1, 3, 3, 1, '3', 3)", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 6, 3, 2, "3", 3})});
        sql("INSERT INTO AGG VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, CAST(NULL AS VARCHAR), 4)", new Object[0]);
        Assertions.assertThat(sql("SELECT a, b, c FROM AGG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{6, 3, null})});
    }

    @Test
    public void testFirstValuePartialUpdate() {
        sql("CREATE TABLE AGG (k INT, a INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a', 'fields.a.aggregate-function'='first_value');", new Object[0]);
        sql("INSERT INTO AGG VALUES (1, 1, 1), (1, 2, 2)", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1, 2})});
        sql("INSERT INTO AGG VALUES (1, 0, 0)", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 0, 2})});
    }

    @Test
    public void testNoSinkMaterializer() {
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, ExecutionConfigOptions.UpsertMaterialize.FORCE);
        this.sEnv.getConfig().set(RestartStrategyOptions.RESTART_STRATEGY, "none");
        try {
            this.sEnv.executeSql("INSERT INTO dwd_orders SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders UNION ALL SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;").await();
            Assertions.fail("Expecting exception");
        } catch (Exception e) {
            Assertions.assertThat(e).hasRootCauseMessage("Sink materializer must not be used with Paimon sink. Please set 'table.exec.sink.upsert-materialize' to 'NONE' in Flink's config.");
        }
    }

    @Test
    public void testPartialUpdateProjectionPushDownWithDeleteMessage() throws Exception {
        this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE source (k INT, a INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', 'changelog-mode' = 'I,D,UA,UB')", TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind(RowKind.INSERT, new Object[]{1, 1, 1})))));
        sql("CREATE TABLE TEST (k INT, a INT, b INT, g_1 INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a', 'fields.g_2.sequence-group'='b');", new Object[0]);
        CloseableIterator<Row> streamSqlIter = streamSqlIter("INSERT INTO TEST SELECT k, a, CAST(NULL AS INT) AS b, g_1, CAST(NULL AS INT) as g_2 FROM source", new Object[0]);
        sqlAssertWithRetry("SELECT * FROM TEST", listAssert -> {
            listAssert.containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1, null, 1, null})});
        }, new Object[0]);
        this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE source2 (k INT, a INT, g_1 INT) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind(RowKind.DELETE, new Object[]{1, 1, 2})))));
        CloseableIterator<Row> streamSqlIter2 = streamSqlIter("INSERT INTO TEST SELECT k, a, CAST(NULL AS INT) AS b, g_1, CAST(NULL AS INT) as g_2 FROM source2", new Object[0]);
        sqlAssertWithRetry("SELECT * FROM TEST", listAssert2 -> {
            listAssert2.containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null, 2, null})});
        }, new Object[0]);
        Assertions.assertThat(sql("SELECT COUNT(*) FROM TEST", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1L})});
        streamSqlIter.close();
        streamSqlIter2.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "localMergeEnabled = {0}")
    public void testIgnoreDelete(boolean z) throws Exception {
        sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ( 'merge-engine' = 'partial-update', 'ignore-delete' = 'true', 'changelog-producer' = 'lookup')", new Object[0]);
        if (z) {
            sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = '256 kb')", new Object[0]);
        }
        sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')", new Object[0]);
        streamSqlIter("CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', 'changelog-mode' = 'I,D')", TestValuesTableFactory.registerData(Collections.singletonList(Row.ofKind(RowKind.DELETE, new Object[]{1, null, "apple"})))).close();
        this.sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM input").await();
        sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "A", "apple"})});
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM ignore_delete /*+ OPTIONS('scan.timestamp-millis' = '0') */", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(3)).containsExactly(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, null, "apple"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, null, "apple"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, "A", "apple"})});
        streamSqlBlockIter.close();
    }

    @Test
    public void testIgnoreDeleteCompatible() throws Exception {
        sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ( 'merge-engine' = 'deduplicate', 'write-only' = 'true')", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 'apple')", new Object[0]);
        sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS STRING))", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "A", null})});
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
        hashMap.put(CoreOptions.IGNORE_DELETE.key(), "true");
        SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), new Path(this.path, "default.db/ignore_delete")), new Schema(Arrays.asList(new DataField(0, "pk", DataTypes.INT().notNull()), new DataField(1, "a", DataTypes.STRING()), new DataField(2, "b", DataTypes.STRING())), Collections.emptyList(), Collections.singletonList("pk"), hashMap, (String) null));
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "A", "apple"})});
    }
}
