package org.apache.paimon.flink;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/BatchFileStoreITCase.class */
public class BatchFileStoreITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)");
    }

    @Test
    public void testAdaptiveParallelism() {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
        }).hasMessageContaining("Paimon Sink does not support Flink's Adaptive Parallelism mode.");
        batchSql("INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
    }

    @Test
    public void testOverwriteEmpty() {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        batchSql("INSERT OVERWRITE T SELECT * FROM T WHERE 1 <> 1", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).isEmpty();
    }

    @Test
    public void testTimeTravelRead() throws Exception {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(10L);
        batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        long currentTimeMillis2 = System.currentTimeMillis();
        Thread.sleep(10L);
        batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)", new Object[0]);
        long currentTimeMillis3 = System.currentTimeMillis();
        Thread.sleep(10L);
        batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)", new Object[0]);
        paimonTable("T").createTag("tag2", 2L);
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        Assertions.assertThatThrownBy(() -> {
            batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */", new Object[0]);
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "The specified scan snapshotId 0 is out of available snapshotId range [1, 4].")});
        Assertions.assertThatThrownBy(() -> {
            batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */", new Object[0]);
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "The specified scan snapshotId 0 is out of available snapshotId range [1, 4].")});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.file-creation-time-millis'='%s') */", Long.valueOf(currentTimeMillis))).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666}), Row.of(new Object[]{7, 77, 777}), Row.of(new Object[]{8, 88, 888})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis2)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.file-creation-time-millis'='%s') */", Long.valueOf(currentTimeMillis2)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666}), Row.of(new Object[]{7, 77, 777}), Row.of(new Object[]{8, 88, 888})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666})});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis3)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666})});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.file-creation-time-millis'='%s') */", Long.valueOf(currentTimeMillis3)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{7, 77, 777}), Row.of(new Object[]{8, 88, 888})});
        Assertions.assertThatThrownBy(() -> {
            batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s', 'scan.snapshot-id'='1') */", Long.valueOf(currentTimeMillis3)), new Object[0]);
        }).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("[scan.snapshot-id] must be null when you set [scan.timestamp-millis]");
        Assertions.assertThatThrownBy(() -> {
            batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='full', 'scan.snapshot-id'='1') */", new Object[0]);
        }).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("%s must be null when you use latest-full for scan.mode", new Object[]{CoreOptions.SCAN_SNAPSHOT_ID.key()});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThatThrownBy(() -> {
            batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */", new Object[0]);
        }).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("Tag 'unknown' doesn't exist.");
    }

    @Timeout(120)
    @Test
    public void testTimeTravelReadWithWatermark() throws Exception {
        streamSqlIter("CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING, dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')", new Object[0]);
        sql("CREATE TABLE WT (a STRING, b STRING, c STRING, dt TIMESTAMP, PRIMARY KEY (a) NOT ENFORCED)", new Object[0]);
        CloseableIterator<Row> streamSqlIter = streamSqlIter("INSERT INTO WT SELECT * FROM gen ", new Object[0]);
        while (true) {
            List list = (List) sql("SELECT `watermark` FROM WT$snapshots", new Object[0]).stream().map(row -> {
                return (Long) row.getField("watermark");
            }).collect(Collectors.toList());
            if (list.size() > 1) {
                streamSqlIter.close();
                Assertions.assertThat(batchSql(String.format("SELECT * FROM WT /*+ OPTIONS('scan.watermark'='%d') */", (Long) list.get(list.size() - 1)), new Object[0])).isNotEmpty();
                return;
            }
            Thread.sleep(1000L);
        }
    }

    @Test
    public void testTimeTravelReadWithSnapshotExpiration() throws Exception {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        paimonTable("T").createTag("tag1", 1L);
        batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
        FileStoreTable paimonTable = paimonTable("T");
        paimonTable.copy(hashMap).newCommit("").expireSnapshots();
        Assertions.assertThat(paimonTable.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag1') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
    }

    @Test
    public void testSortSpillMerge() {
        sql("CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('sort-spill-threshold'='2')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '1')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '2')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '3')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '4')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '5')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '6')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '7')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM KT", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "7"})});
        Assertions.assertThat(sql("SELECT b FROM KT", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7"})});
    }

    @Test
    public void testTruncateTable() {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        List<Row> batchSql = batchSql("TRUNCATE TABLE T", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(1);
        Assertions.assertThat(batchSql.get(0)).isEqualTo(Row.ofKind(RowKind.INSERT, new Object[]{"OK"}));
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0]).isEmpty()).isTrue();
    }

    @Test
    public void testDynamicPartitionPruning() {
        sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        sql("CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        sql("INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        sql("INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        sql("INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        Assertions.assertThat(this.tEnv.explainSql("SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1 ORDER BY a", new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat(this.tEnv.explainSql("SELECT a, b, c, p, x, y FROM dim INNER JOIN fact ON x = p and z = 1 ORDER BY a", new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat(sql("SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1 ORDER BY a", new Object[0]).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]");
        Assertions.assertThat(sql("SELECT a, b, c, p, x, y FROM dim INNER JOIN fact ON x = p and z = 1 ORDER BY a", new Object[0]).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]");
    }

    @Test
    public void testDynamicPartitionPruningOnTwoFactTables() {
        sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        sql("CREATE TABLE fact1 (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        sql("INSERT INTO fact1 PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        sql("INSERT INTO fact1 PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        sql("INSERT INTO fact1 PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        sql("CREATE TABLE fact2 (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        sql("INSERT INTO fact2 PARTITION (p = 1) VALUES (40, 100, 'aaa'), (41, 101, 'bbb'), (42, 102, 'ccc')", new Object[0]);
        sql("INSERT INTO fact2 PARTITION (p = 2) VALUES (50, 200, 'aaa'), (51, 201, 'bbb'), (52, 202, 'ccc')", new Object[0]);
        sql("INSERT INTO fact2 PARTITION (p = 3) VALUES (60, 300, 'aaa'), (61, 301, 'bbb'), (62, 302, 'ccc')", new Object[0]);
        Assertions.assertThat(this.tEnv.explainSql("SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 1)\nORDER BY a", new ExplainDetail[0])).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
        Assertions.assertThat(sql("SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 1)\nORDER BY a", new Object[0]).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], +I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]");
        Assertions.assertThat(this.tEnv.explainSql("SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 2)\nORDER BY a", new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat(sql("SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 2)\nORDER BY a", new Object[0]).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]");
    }

    @Test
    public void testRowKindField() {
        sql("CREATE TABLE R_T (pk INT PRIMARY KEY NOT ENFORCED, v INT, rf STRING) WITH ('rowkind.field'='rf')", new Object[0]);
        sql("INSERT INTO R_T VALUES (1, 1, '+I')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM R_T", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, 1, "+I"})});
        sql("INSERT INTO R_T VALUES (1, 2, '-D')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM R_T", new Object[0])).isEmpty();
    }

    @Test
    public void testIgnoreDelete() {
        sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1')", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, 'A')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, "A"})});
        sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, "A"})});
        sql("INSERT INTO ignore_delete VALUES (1, 'B')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, "B"})});
    }

    @Test
    public void testIgnoreDeleteCompatible() {
        sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) WITH ('merge-engine' = 'deduplicate', 'write-only' = 'true')", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, 'A')", new Object[0]);
        sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).isEmpty();
        sql("ALTER TABLE ignore_delete set ('ignore-delete' = 'true')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "A"})});
    }

    @Test
    public void testIgnoreDeleteWithRowKindField() {
        sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, "A", "+I"})});
        sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, "A", "+I"})});
        sql("INSERT INTO ignore_delete VALUES (1, 'B', '+I')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, "B", "+I"})});
    }

    @Test
    public void testDeleteWithPkLookup() throws Exception {
        sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) WITH ('changelog-producer' = 'lookup', 'bucket' = '1')", new Object[0]);
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM ignore_delete", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')", new Object[0]);
        sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        sql("INSERT INTO ignore_delete VALUES (1, 'B')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, "B"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "B"})});
        streamSqlBlockIter.close();
    }

    @ValueSource(strings = {"none", "lookup", "input"})
    @ParameterizedTest
    public void testDeletePartitionWithChangelog(String str) throws Exception {
        sql("CREATE TABLE delete_table (pt INT, pk INT, v STRING, PRIMARY KEY(pt, pk) NOT ENFORCED) PARTITIONED BY (pt)   WITH ('changelog-producer' = '" + str + "', 'delete.force-produce-changelog'='true', 'bucket'='1')", new Object[0]);
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM delete_table", new Object[0]);
        sql("INSERT INTO delete_table VALUES (1, 1, 'A'), (2, 2, 'B')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, 1, "A"}), Row.ofKind(RowKind.INSERT, new Object[]{2, 2, "B"})});
        sql("DELETE FROM delete_table WHERE pt = 1", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.DELETE, new Object[]{1, 1, "A"})});
        sql("INSERT INTO delete_table VALUES (1, 1, 'B')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, 1, "B"})});
        streamSqlBlockIter.close();
    }

    @Test
    public void testScanFromOldSchema() throws InterruptedException {
        sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)", new Object[0]);
        sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')", new Object[0]);
        Thread.sleep(1000L);
        long currentTimeMillis = System.currentTimeMillis();
        sql("ALTER TABLE select_old ADD f2 STRING", new Object[0]);
        sql("INSERT INTO select_old VALUES (3, 'c', 'C')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM select_old /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis))).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "a", null}), Row.of(new Object[]{2, "b", null})});
        Assertions.assertThat(sql("SELECT * FROM select_old FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", DateTimeUtils.formatTimestamp(DateTimeUtils.toInternal(currentTimeMillis, 0), 0))).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "a"}), Row.of(new Object[]{2, "b"})});
    }
}
