package org.apache.paimon.flink;

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/DeletionVectorITCase.class */
public class DeletionVectorITCase extends CatalogITCaseBase {
    @ValueSource(strings = {"none", "lookup"})
    @ParameterizedTest
    public void testStreamingReadDVTable(String str) throws Exception {
        sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", str), new Object[0]);
        sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */", new Object[0]);
        Throwable th = null;
        try {
            AssertionsForInterfaceTypes.assertThat(streamSqlBlockIter.collect(12)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, "111111111"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "2"}), Row.ofKind(RowKind.INSERT, new Object[]{3, "3"}), Row.ofKind(RowKind.INSERT, new Object[]{4, "4"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, "2"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, "2_1"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{3, "3"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{3, "3_1"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, "2_1"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, "2_2"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{4, "4"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{4, "4_1"})});
            if (streamSqlBlockIter != null) {
                if (0 != 0) {
                    try {
                        streamSqlBlockIter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    streamSqlBlockIter.close();
                }
            }
            BlockingIterator<Row, Row> streamSqlBlockIter2 = streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);
            Throwable th3 = null;
            try {
                try {
                    AssertionsForInterfaceTypes.assertThat(streamSqlBlockIter2.collect(8)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, "111111111"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "2_1"}), Row.ofKind(RowKind.INSERT, new Object[]{3, "3_1"}), Row.ofKind(RowKind.INSERT, new Object[]{4, "4"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, "2_1"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, "2_2"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{4, "4"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{4, "4_1"})});
                    if (streamSqlBlockIter2 != null) {
                        if (0 == 0) {
                            streamSqlBlockIter2.close();
                            return;
                        }
                        try {
                            streamSqlBlockIter2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (streamSqlBlockIter2 != null) {
                    if (th3 != null) {
                        try {
                            streamSqlBlockIter2.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        streamSqlBlockIter2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (streamSqlBlockIter != null) {
                if (0 != 0) {
                    try {
                        streamSqlBlockIter.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    streamSqlBlockIter.close();
                }
            }
            throw th8;
        }
    }

    @ValueSource(strings = {"none", "lookup"})
    @ParameterizedTest
    public void testBatchReadDVTable(String str) {
        sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", str), new Object[0]);
        sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "111111111"}), Row.of(new Object[]{2, "2_2"}), Row.of(new Object[]{3, "3_1"}), Row.of(new Object[]{4, "4_1"})});
        AssertionsForInterfaceTypes.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "111111111"}), Row.of(new Object[]{2, "2"}), Row.of(new Object[]{3, "3"}), Row.of(new Object[]{4, "4"})});
        AssertionsForInterfaceTypes.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='4') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "111111111"}), Row.of(new Object[]{2, "2_1"}), Row.of(new Object[]{3, "3_1"}), Row.of(new Object[]{4, "4"})});
    }

    @ValueSource(strings = {"none", "lookup"})
    @ParameterizedTest
    public void testDVTableWithAggregationMergeEngine(String str) throws Exception {
        sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", str), new Object[0]);
        sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)", new Object[0]);
        sql("INSERT INTO T VALUES (2, 1), (3, 1)", new Object[0]);
        sql("INSERT INTO T VALUES (2, 1), (4, 1)", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 111111111}), Row.of(new Object[]{2, 4}), Row.of(new Object[]{3, 4}), Row.of(new Object[]{4, 5})});
        if (str.equals("lookup")) {
            BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);
            Throwable th = null;
            try {
                try {
                    AssertionsForInterfaceTypes.assertThat(streamSqlBlockIter.collect(8)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, 111111111}), Row.ofKind(RowKind.INSERT, new Object[]{2, 3}), Row.ofKind(RowKind.INSERT, new Object[]{3, 4}), Row.ofKind(RowKind.INSERT, new Object[]{4, 4}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, 3}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, 4}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{4, 4}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{4, 5})});
                    if (streamSqlBlockIter != null) {
                        if (0 == 0) {
                            streamSqlBlockIter.close();
                            return;
                        }
                        try {
                            streamSqlBlockIter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (streamSqlBlockIter != null) {
                    if (th != null) {
                        try {
                            streamSqlBlockIter.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        streamSqlBlockIter.close();
                    }
                }
                throw th4;
            }
        }
    }

    @ValueSource(strings = {"none", "lookup"})
    @ParameterizedTest
    public void testDVTableWithPartialUpdateMergeEngine(String str) throws Exception {
        sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1 STRING, v2 STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'merge-engine'='partial-update')", str), new Object[0]);
        sql("INSERT INTO T VALUES (1, '111111111', '1'), (2, '2', CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')", new Object[0]);
        sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1', '3_1')", new Object[0]);
        sql("INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4, '4', CAST(NULL AS STRING))", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "111111111", "1"}), Row.of(new Object[]{2, "2_1", "2"}), Row.of(new Object[]{3, "3_1", "3_1"}), Row.of(new Object[]{4, "4", "4"})});
        if (str.equals("lookup")) {
            BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);
            Throwable th = null;
            try {
                try {
                    AssertionsForInterfaceTypes.assertThat(streamSqlBlockIter.collect(8)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, "111111111", "1"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "2", "2"}), Row.ofKind(RowKind.INSERT, new Object[]{3, "3_1", "3_1"}), Row.ofKind(RowKind.INSERT, new Object[]{4, null, "4"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, "2", "2"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, "2_1", "2"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{4, null, "4"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{4, "4", "4"})});
                    if (streamSqlBlockIter != null) {
                        if (0 == 0) {
                            streamSqlBlockIter.close();
                            return;
                        }
                        try {
                            streamSqlBlockIter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (streamSqlBlockIter != null) {
                    if (th != null) {
                        try {
                            streamSqlBlockIter.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        streamSqlBlockIter.close();
                    }
                }
                throw th4;
            }
        }
    }
}
