package org.apache.paimon.flink;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/FullCompactionFileStoreITCase.class */
public class FullCompactionFileStoreITCase extends CatalogITCaseBase {
    private final String table = "T";
    private final String options = " WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')";

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    @BeforeEach
    public void before() throws IOException {
        super.before();
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')");
    }

    @Test
    public void testStreamingRead() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", "T"));
        sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", "T");
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        sql("INSERT INTO %s VALUES ('7', '8', '9')", "T");
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
    }

    @Test
    public void testStreamingReadOfArray() throws Exception {
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS T_ARRAY(ID INT PRIMARY KEY NOT ENFORCED,\nNAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n) WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')");
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", "T_ARRAY"));
        sql("INSERT INTO %s VALUES (1, ARRAY[('c','mark1'), ('d','mark2'), ('e','mark3')]);", "T_ARRAY");
        Assertions.assertThat((List) of.collect(1).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"+I[1, [+I[c, mark1], +I[d, mark2], +I[e, mark3]]]"});
    }

    @Test
    public void testCompactedScanMode() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s /*+ OPTIONS('scan.mode'='compacted-full') */", "T"));
        sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", "T");
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        sql("INSERT INTO %s VALUES ('7', '8', '9')", "T");
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
        Assertions.assertThat(sql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted-full') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"}), Row.of(new Object[]{"7", "8", "9"})});
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testUpdate(boolean z) throws Exception {
        sql("ALTER TABLE %s SET ('changelog-producer.row-deduplicate' = '%s')", "T", Boolean.valueOf(z));
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", "T"));
        sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '1')", "T");
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "1"})});
        sql("INSERT INTO %s VALUES ('1', '4', '5')", "T");
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{"1", "2", "3"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{"1", "4", "5"})});
        for (int i = 1; i < 5; i++) {
            sql("INSERT INTO %s VALUES ('1', '4', '5'), ('4', '5', '%s')", "T", Integer.valueOf(i + 1));
            if (z) {
                Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{"4", "5", String.valueOf(i)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{"4", "5", String.valueOf(i + 1)})});
            } else {
                Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{"1", "4", "5"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{"1", "4", "5"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{"4", "5", String.valueOf(i)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{"4", "5", String.valueOf(i + 1)})});
            }
        }
        of.close();
    }

    @Test
    public void testUpdateAuditLog() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s$audit_log", "T"));
        sql("INSERT INTO %s VALUES ('1', '2', '3')", "T");
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{"+I", "1", "2", "3"})});
        sql("INSERT INTO %s VALUES ('1', '4', '5')", "T");
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{"-U", "1", "2", "3"}), Row.ofKind(RowKind.INSERT, new Object[]{"+U", "1", "4", "5"})});
        of.close();
        Assertions.assertThat(sql("SELECT * FROM %s$audit_log", "T")).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{"+I", "1", "4", "5"})});
    }

    @Test
    public void testRowDeduplicateWithArrayRow() throws Exception {
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS T_ARRAY_ROW(ID INT PRIMARY KEY NOT ENFORCED,\nNAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n) WITH ('changelog-producer'='full-compaction','changelog-producer.compaction-interval' = '1s','changelog-producer.row-deduplicate' = 'true')");
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", "T_ARRAY_ROW"));
        sql("INSERT INTO %s VALUES (1, ARRAY[('a','mark1')]);", "T_ARRAY_ROW");
        Assertions.assertThat((List) of.collect(1).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"+I[1, [+I[a, mark1]]]"});
        sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')])", "T_ARRAY_ROW");
        Assertions.assertThat((List) of.collect(2).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).containsExactly(new String[]{"-U[1, [+I[a, mark1]]]", "+U[1, [+I[b, mark2]]]"});
        sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')]), (2, ARRAY[('c', 'mark3')])", "T_ARRAY_ROW");
        Assertions.assertThat((List) of.collect(1).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).containsExactly(new String[]{"+I[2, [+I[c, mark3]]]"});
    }
}
