package org.apache.paimon.flink;

import java.util.Collections;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/FirstRowITCase.class */
public class FirstRowITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH ('merge-engine'='first-row', 'file.format'='avro', 'changelog-producer' = 'lookup');");
    }

    @Test
    public void testIllegal() {
        Assertions.assertThatThrownBy(() -> {
            sql("CREATE TABLE ILLEGAL_T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH ('merge-engine'='first-row')", new Object[0]);
        }).hasRootCauseMessage("Only support 'lookup' changelog-producer on FIRST_MERGE merge engine");
    }

    @Test
    public void testBatchQuery() {
        batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, 1, "1"})});
        Assertions.assertThat(batchSql("SELECT c FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{"1"})});
    }

    @Test
    public void testStreamingRead() throws Exception {
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        sql("INSERT INTO T VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 4, '4')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{1, 1, "1"}), Row.ofKind(RowKind.INSERT, new Object[]{2, 2, "2"})});
        sql("INSERT INTO T VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3, 3, '3'), (4, 4, '4'), (5, 5, '5'), (6, 6, '6'), (7, 7, '7')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(5)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{3, 3, "3"}), Row.ofKind(RowKind.INSERT, new Object[]{4, 4, "4"}), Row.ofKind(RowKind.INSERT, new Object[]{5, 5, "5"}), Row.ofKind(RowKind.INSERT, new Object[]{6, 6, "6"}), Row.ofKind(RowKind.INSERT, new Object[]{7, 7, "7"})});
        sql("INSERT INTO T VALUES(7, 7, '8'), (8, 8, '8')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.INSERT, new Object[]{8, 8, "8"})});
    }
}
