package org.apache.paimon.flink;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/LookupChangelogWithAggITCase.class */
public class LookupChangelogWithAggITCase extends CatalogITCaseBase {
    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testMultipleCompaction(boolean z) throws Exception {
        sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'changelog-producer.row-deduplicate'='%s', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", Boolean.valueOf(z));
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        sql("INSERT INTO T VALUES (1, 1), (2, 2)", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1}), Row.of(new Object[]{2, 2})});
        for (int i = 1; i < 5; i++) {
            sql("INSERT INTO T VALUES (1, 1), (2, 2)", new Object[0]);
            Assertions.assertThat(streamSqlBlockIter.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, Integer.valueOf(i)}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, Integer.valueOf(2 * i)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, Integer.valueOf(i + 1)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, Integer.valueOf(2 * (i + 1))})});
        }
        for (int i2 = 5; i2 < 10; i2++) {
            sql("INSERT INTO T VALUES (1, 0), (2, 2)", new Object[0]);
            if (z) {
                Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, Integer.valueOf(2 * i2)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, Integer.valueOf(2 * (i2 + 1))})});
            } else {
                Assertions.assertThat(streamSqlBlockIter.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, 5}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, Integer.valueOf(2 * i2)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, 5}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, Integer.valueOf(2 * (i2 + 1))})});
            }
        }
        streamSqlBlockIter.close();
    }

    @Test
    public void testLookupChangelogProducerWithValueSwitch() throws Exception {
        sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", new Object[0]);
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        sql("INSERT INTO T VALUES (1, 1), (2, 2), (1, 3), (1, 4), (1, 5)", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 13}), Row.of(new Object[]{2, 2})});
        streamSqlBlockIter.close();
    }

    @Test
    public void testLookupChangelogProducerWithProjection() {
        sql("CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v1 INT, v2 INT) WITH ('bucket'='3', 'changelog-producer'='lookup', 'merge-engine'='aggregation', 'fields.v1.aggregate-function'='sum', 'fields.v2.aggregate-function'='sum')", new Object[0]);
        int nextInt = 3 + ThreadLocalRandom.current().nextInt(3);
        for (int i = 0; i < nextInt; i++) {
            sql("INSERT INTO T VALUES (1, 1, 1), (2, 2, 2)", new Object[0]);
        }
        Assertions.assertThat(sql("SELECT v2 FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{Integer.valueOf(nextInt)}), Row.of(new Object[]{Integer.valueOf(nextInt * 2)})});
    }
}
