package org.apache.paimon.flink;

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/LookupChangelogWithAggITCase.class */
public class LookupChangelogWithAggITCase extends CatalogITCaseBase {
    @Test
    public void testMultipleCompaction() throws Exception {
        sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", new Object[0]);
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        sql("INSERT INTO T VALUES (1, 1), (2, 2)", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1}), Row.of(new Object[]{2, 2})});
        for (int i = 1; i < 5; i++) {
            sql("INSERT INTO T VALUES (1, 1), (2, 2)", new Object[0]);
            Assertions.assertThat(streamSqlBlockIter.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, Integer.valueOf(i)}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, Integer.valueOf(2 * i)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, Integer.valueOf(i + 1)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, Integer.valueOf(2 * (i + 1))})});
        }
        streamSqlBlockIter.close();
    }
}
