package org.apache.flink.table.store.connector;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/AppendOnlyTableITCase.class */
public class AppendOnlyTableITCase extends FileStoreTableITCase {
    @Test
    public void testCreateTableWithPrimaryKey() {
        Assertions.assertThatThrownBy(() -> {
            batchSql("CREATE TABLE pk_table (id INT PRIMARY KEY NOT ENFORCED, data STRING) WITH ('write-mode'='append-only')", new Object[0]);
        }).hasRootCauseInstanceOf(TableException.class).hasRootCauseMessage("Cannot define any primary key in an append-only table. Set 'write-mode'='change-log' if still want to keep the primary key definition.");
    }

    @Test
    public void testReadEmpty() {
        Assertions.assertThat(batchSql("SELECT * FROM append_table", new Object[0])).isEmpty();
    }

    @Test
    public void testReadWrite() {
        batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", new Object[0]);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(2);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"})});
        List<Row> batchSql2 = batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql2.size()).isEqualTo(2);
        Assertions.assertThat(batchSql2).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{2})});
        List<Row> batchSql3 = batchSql("SELECT data from append_table", new Object[0]);
        Assertions.assertThat(batchSql3.size()).isEqualTo(2);
        Assertions.assertThat(batchSql3).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"BBB"})});
    }

    @Test
    public void testSkipDedup() {
        batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2, 'BBB'), (3, 'BBB')", new Object[0]);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(4);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "BBB"})});
        List<Row> batchSql2 = batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql2.size()).isEqualTo(4);
        Assertions.assertThat(batchSql2).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3})});
        List<Row> batchSql3 = batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql3.size()).isEqualTo(4);
        Assertions.assertThat(batchSql3).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"BBB"}), Row.of(new Object[]{"BBB"})});
    }

    @Test
    public void testIngestFromSource() {
        batchSql("CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind(RowKind.INSERT, new Object[]{1, "AAA"}), Row.ofKind(RowKind.INSERT, new Object[]{1, "AAA"}), Row.ofKind(RowKind.INSERT, new Object[]{1, "BBB"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "AAA"}))));
        batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0]);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(4);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{1, "BBB"}), Row.of(new Object[]{2, "AAA"})});
        List<Row> batchSql2 = batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql2.size()).isEqualTo(4);
        Assertions.assertThat(batchSql2).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{1}), Row.of(new Object[]{1}), Row.of(new Object[]{2})});
        List<Row> batchSql3 = batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql3.size()).isEqualTo(4);
        Assertions.assertThat(batchSql3).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"BBB"}), Row.of(new Object[]{"AAA"})});
    }

    @Test
    public void testAutoCompaction() {
        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')", new Object[0]);
        batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')", new Object[0]);
        assertAutoCompaction("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", 1L, Snapshot.CommitKind.APPEND);
        assertAutoCompaction("INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')", 2L, Snapshot.CommitKind.APPEND);
        assertAutoCompaction("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD')", 3L, Snapshot.CommitKind.APPEND);
        assertAutoCompaction("INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')", 5L, Snapshot.CommitKind.COMPACT);
        assertAutoCompaction("INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')", 6L, Snapshot.CommitKind.APPEND);
        assertAutoCompaction("INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')", 7L, Snapshot.CommitKind.APPEND);
        assertAutoCompaction("INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')", 9L, Snapshot.CommitKind.COMPACT);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(16);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "CCC"}), Row.of(new Object[]{4, "DDD"}), Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "CCC"}), Row.of(new Object[]{4, "DDD"}), Row.of(new Object[]{5, "EEE"}), Row.of(new Object[]{6, "FFF"}), Row.of(new Object[]{7, "HHH"}), Row.of(new Object[]{8, "III"}), Row.of(new Object[]{9, "JJJ"}), Row.of(new Object[]{10, "KKK"}), Row.of(new Object[]{11, "LLL"}), Row.of(new Object[]{12, "MMM"})});
    }

    @Test
    public void testRejectDelete() {
        testRejectChanges(RowKind.DELETE);
    }

    @Test
    public void testRejectUpdateBefore() {
        testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testRejectUpdateAfter() {
        testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Override // org.apache.flink.table.store.connector.FileStoreTableITCase
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')");
    }

    private void testRejectChanges(RowKind rowKind) {
        batchSql("CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", TestValuesTableFactory.registerData(Collections.singletonList(Row.ofKind(rowKind, new Object[]{1, "AAA"}))));
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0]);
        }).hasRootCauseInstanceOf(IllegalStateException.class).hasRootCauseMessage("Append only writer can not accept row with RowKind %s", new Object[]{rowKind});
    }

    private void assertAutoCompaction(String str, long j, Snapshot.CommitKind commitKind) {
        batchSql(str, new Object[0]);
        Snapshot findLatestSnapshot = findLatestSnapshot("append_table", true);
        Assertions.assertThat(findLatestSnapshot.id()).isEqualTo(j);
        Assertions.assertThat(findLatestSnapshot.commitKind()).isEqualTo(commitKind);
    }
}
