package org.apache.paimon.flink;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase.class */
public class PreAggregationITCase {

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$BasicAggregateITCase.class */
    public static class BasicAggregateITCase extends CatalogITCaseBase {
        @Test
        public void testLocalMerge() {
            sql("CREATE TABLE T (k INT,v INT,d INT,PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)  WITH ('merge-engine'='aggregation', 'fields.v.aggregate-function'='sum','local-merge-buffer-size'='1m');", new Object[0]);
            sql("INSERT INTO T VALUES(1, 1, 1), (2, 1, 1), (1, 2, 1)", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 3, 1}), Row.of(new Object[]{2, 1, 1})});
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$BoolOrAndAggregation.class */
    public static class BoolOrAndAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T7 (j INT, k INT, a BOOLEAN, b BOOLEAN,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='bool_or','fields.b.aggregate-function'='bool_and');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN)),(1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN)), (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T7", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, true, false})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T7", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, true, false})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T7 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 3, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            batchSql("INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T7", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, true, false}), Row.of(new Object[]{1, 3, false, true})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T7").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$CollectAggregationITCase.class */
    public static class CollectAggregationITCase extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected int defaultParallelism() {
            return 1;
        }

        @Test
        public void testAggWithDistinct() {
            sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'collect',  'fields.f0.distinct' = 'true')", new Object[0]);
            sql("INSERT INTO test_collect VALUES (1, CAST (NULL AS ARRAY<STRING>)), (2, ARRAY['A', 'B']), (3, ARRAY['car', 'watch'])", new Object[0]);
            List<Row> queryAndSort = queryAndSort("SELECT * FROM test_collect");
            checkOneRecord(queryAndSort.get(0), 1, new String[0]);
            checkOneRecord(queryAndSort.get(1), 2, "A", "B");
            checkOneRecord(queryAndSort.get(2), 3, "car", "watch");
            sql("INSERT INTO test_collect VALUES (1, ARRAY['paimon', 'paimon']), (2, ARRAY['A', 'B', 'C']), (3, CAST (NULL AS ARRAY<STRING>))", new Object[0]);
            List<Row> queryAndSort2 = queryAndSort("SELECT * FROM test_collect");
            checkOneRecord(queryAndSort2.get(0), 1, "paimon");
            checkOneRecord(queryAndSort2.get(1), 2, "A", "B", "C");
            checkOneRecord(queryAndSort2.get(2), 3, "car", "watch");
        }

        @Test
        public void testAggWithoutDistinct() {
            sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'collect')", new Object[0]);
            sql("INSERT INTO test_collect VALUES (1, CAST (NULL AS ARRAY<STRING>)), (2, ARRAY['A', 'B', 'B']), (3, ARRAY['car', 'watch'])", new Object[0]);
            List<Row> queryAndSort = queryAndSort("SELECT * FROM test_collect");
            checkOneRecord(queryAndSort.get(0), 1, new String[0]);
            checkOneRecord(queryAndSort.get(1), 2, "A", "B", "B");
            checkOneRecord(queryAndSort.get(2), 3, "car", "watch");
            sql("INSERT INTO test_collect VALUES (1, ARRAY['paimon', 'paimon']), (2, ARRAY['A', 'B', 'C']), (3, CAST (NULL AS ARRAY<STRING>))", new Object[0]);
            List<Row> queryAndSort2 = queryAndSort("SELECT * FROM test_collect");
            checkOneRecord(queryAndSort2.get(0), 1, "paimon", "paimon");
            checkOneRecord(queryAndSort2.get(1), 2, "A", "A", "B", "B", "B", "C");
            checkOneRecord(queryAndSort2.get(2), 3, "car", "watch");
        }

        private static List<Arguments> retractArguments() {
            return Arrays.asList(Arguments.arguments(new Object[]{"lookup", "aggregation"}), Arguments.arguments(new Object[]{"lookup", "partial-update"}), Arguments.arguments(new Object[]{"full-compaction", "aggregation"}), Arguments.arguments(new Object[]{"full-compaction", "partial-update"}));
        }

        @MethodSource({"retractArguments"})
        @ParameterizedTest(name = "changelog-producer = {0}, merge-engine = {1}")
        public void testRetract(String str, String str2) throws Exception {
            sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'changelog-producer' = '%s',  'merge-engine' = '%s',  'fields.f0.aggregate-function' = 'collect'  %s)", str, str2, str2.equals("partial-update") ? ", 'fields.f1.sequence-group' = 'f0'" : "");
            BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM test_collect", new Object[0]);
            sql("INSERT INTO test_collect VALUES (1, ARRAY['A', 'B'], 1)", new Object[0]);
            checkOneRecord((Row) streamSqlBlockIter.collect(1).get(0), 1, "A", "B");
            this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE %s (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = '%s')", "INPUT11", TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, new String[]{"A", "B"}, 2}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, new String[]{"C", "D"}, 3}))), "UB,UA")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT11").await();
            List collect = streamSqlBlockIter.collect(2);
            Assertions.assertThat(((Row) collect.get(0)).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
            checkOneRecord((Row) collect.get(0), 1, "A", "B");
            Assertions.assertThat(((Row) collect.get(1)).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
            checkOneRecord((Row) collect.get(1), 1, "A", "B", "C", "D");
            this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE %s (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = '%s')", "INPUT12", TestValuesTableFactory.registerData(Collections.singletonList(Row.ofKind(RowKind.DELETE, new Object[]{1, new String[]{"C", "D"}, 4}))), "D")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT12").await();
            List collect2 = streamSqlBlockIter.collect(2);
            Assertions.assertThat(((Row) collect2.get(0)).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
            checkOneRecord((Row) collect2.get(0), 1, "A", "B", "C", "D");
            Assertions.assertThat(((Row) collect2.get(1)).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
            checkOneRecord((Row) collect2.get(1), 1, "A", "B");
            sql("INSERT INTO test_collect VALUES (2, ARRAY['A', 'B'], 5), (3, ARRAY['A', 'B'], 6)", new Object[0]);
            List collect3 = streamSqlBlockIter.collect(2);
            checkOneRecord((Row) collect3.get(0), 2, "A", "B");
            checkOneRecord((Row) collect3.get(1), 3, "A", "B");
            this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE %s (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = '%s')", "INPUT21", TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, new String[]{"A", "B"}, 7}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, new String[]{"C", "D"}, 8}))), "UB,UA")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT21").await();
            List collect4 = streamSqlBlockIter.collect(2);
            Assertions.assertThat(((Row) collect4.get(0)).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
            checkOneRecord((Row) collect4.get(0), 2, "A", "B");
            Assertions.assertThat(((Row) collect4.get(1)).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
            checkOneRecord((Row) collect4.get(1), 2, "A", "B", "C", "D");
            this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE %s (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = '%s')", "INPUT22", TestValuesTableFactory.registerData(Collections.singletonList(Row.ofKind(RowKind.DELETE, new Object[]{3, new String[]{"A"}, 9}))), "D")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT22").await();
            List collect5 = streamSqlBlockIter.collect(2);
            Assertions.assertThat(((Row) collect5.get(0)).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
            checkOneRecord((Row) collect5.get(0), 3, "A", "B");
            Assertions.assertThat(((Row) collect5.get(1)).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
            checkOneRecord((Row) collect5.get(1), 3, "B");
            streamSqlBlockIter.close();
        }

        private void checkOneRecord(Row row, int i, String... strArr) {
            Assertions.assertThat(row.getField(0)).isEqualTo(Integer.valueOf(i));
            if (strArr == null || strArr.length == 0) {
                Assertions.assertThat(row.getField(1)).isNull();
            } else {
                Assertions.assertThat((String[]) row.getField(1)).containsExactlyInAnyOrder(strArr);
            }
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$FirstValueAggregation.class */
    public static class FirstValueAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Arrays.asList("CREATE TABLE T (k INT,a INT,b VARCHAR,c VARCHAR,d VARCHAR,PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'changelog-producer' = 'full-compaction','fields.b.aggregate-function'='first_value','fields.c.aggregate-function'='first_non_null_value','fields.d.aggregate-function'='first_not_null_value','sequence.field'='a');", "CREATE TABLE T2 (k INT,v STRING,PRIMARY KEY (k) NOT ENFORCED)WITH ('merge-engine' = 'aggregation','fields.v.aggregate-function' = 'first_value','fields.v.ignore-retract' = 'true');");
        }

        @Test
        public void tesInMemoryMerge() {
            batchSql("INSERT INTO T VALUES (1, 0, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR)),(1, 1, '1', '1', '1'), (2, 2, '2', '2', '2'),(2, 3, '22', '22', '22')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1, null, "1", "1"}), Row.of(new Object[]{2, 3, "2", "2", "2"})});
        }

        @Test
        public void tesUnOrderInput() {
            batchSql("INSERT INTO T VALUES (1, 0, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR)),(1, 1, '1', '1', '1'), (2, 3, '2', '2', '2'),(2, 2, '22', '22', '22')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1, null, "1", "1"}), Row.of(new Object[]{2, 3, "22", "22", "22"})});
            batchSql("INSERT INTO T VALUES (2, 1, '1', '1', '1')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1, null, "1", "1"}), Row.of(new Object[]{2, 3, "1", "1", "1"})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T VALUES (1, 1, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR))", new Object[0]);
            batchSql("INSERT INTO T VALUES (1, 2, '1', '1', '1')", new Object[0]);
            batchSql("INSERT INTO T VALUES (2, 1, '2', '2', '2')", new Object[0]);
            batchSql("INSERT INTO T VALUES (2, 2, '22', '22', '22')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, null, "1", "1"}), Row.of(new Object[]{2, 2, "2", "2", "2"})});
        }

        @Test
        public void testAggregatorResetWhenIgnoringRetract() {
            int i = 100;
            batchSql("INSERT INTO T2 VALUES " + ((String) IntStream.range(0, 100).mapToObj(i2 -> {
                return String.format("(%d, '%d')", Integer.valueOf(i2), Integer.valueOf(i2));
            }).collect(Collectors.joining(", "))), new Object[0]);
            batchSql("INSERT INTO T2 VALUES " + ((String) IntStream.range(100 / 2, 100).mapToObj(i3 -> {
                return String.format("(%d, '%d')", Integer.valueOf(i3), Integer.valueOf(i3 + i));
            }).collect(Collectors.joining(", "))), new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T2", new Object[0])).containsExactlyInAnyOrder(IntStream.range(0, 100).mapToObj(i4 -> {
                return Row.of(new Object[]{Integer.valueOf(i4), String.valueOf(i4)});
            }).toArray(i5 -> {
                return new Row[i5];
            }));
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$LastNonNullValueAggregation.class */
    public static class LastNonNullValueAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected int defaultParallelism() {
            return 1;
        }

        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T4 (j INT, k INT, a INT, b INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='last_non_null_value', 'fields.i.aggregate-function'='last_non_null_value');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("CREATE TABLE myTable AS SELECT b, c, d, e, f FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), 4, CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST(NULL as INT), CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, 5, CAST(NULL AS DATE))) AS V(a, b, c, d, e, f) ORDER BY a", new Object[0]);
            batchSql("INSERT INTO T4 SELECT * FROM myTable", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T4", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T4", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T4 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 3, 3, 4, CAST('2020-01-01' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 3, 2, 6, CAST(NULL AS DATE))", new Object[0]);
            batchSql("INSERT INTO T4 VALUES (1, 3, CAST(NULL AS INT), CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T4", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)}), Row.of(new Object[]{1, 3, 2, 6, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T4").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$LastValueAggregation.class */
    public static class LastValueAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected int defaultParallelism() {
            return 1;
        }

        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T5 (j INT, k INT, a INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='last_value', 'fields.i.aggregate-function'='last_value');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("CREATE TABLE myTable AS SELECT b, c, d, e FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, CAST(NULL AS DATE))) AS V(a, b, c, d, e) ORDER BY a", new Object[0]);
            batchSql("INSERT INTO T5 SELECT * FROM myTable", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T5", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, null})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T5", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, null})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T5 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 3, 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 3, 2, CAST(NULL AS DATE))", new Object[0]);
            batchSql("INSERT INTO T5 VALUES (1, 3, CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T5", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, null}), Row.of(new Object[]{1, 3, null, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T5").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$ListAggAggregation.class */
    public static class ListAggAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected int defaultParallelism() {
            return 1;
        }

        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T6 (j INT, k INT, a STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='listagg');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("CREATE TABLE myTable AS SELECT b, c, d FROM (VALUES   (1, 1, 2, 'first line'),  (2, 1, 2, CAST(NULL AS STRING)),  (3, 1, 2, 'second line')) AS V(a, b, c, d) ORDER BY a", new Object[0]);
            batchSql("INSERT INTO T6 SELECT * FROM myTable", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T6", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, "first line,second line"})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T6", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, "first line,second line"})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T6 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T6", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, "first line,second line"}), Row.of(new Object[]{1, 3, null})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T6").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$MaxAggregation.class */
    public static class MaxAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T2 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,i DATE,l TIMESTAMP,m CHAR,n VARCHAR,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='max', 'fields.b.aggregate-function'='max', 'fields.c.aggregate-function'='max', 'fields.d.aggregate-function'='max', 'fields.e.aggregate-function'='max', 'fields.f.aggregate-function'='max','fields.h.aggregate-function'='max','fields.i.aggregate-function'='max','fields.l.aggregate-function'='max','fields.m.aggregate-function'='max','fields.n.aggregate-function'='max');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T2", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte) 2, (short) 2, 10000000L, Float.valueOf(1.11f), Double.valueOf(1.21d), LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T2", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte) 2, (short) 2, 10000000L, Float.valueOf(1.11f), Double.valueOf(1.21d), LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T2 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            batchSql("INSERT INTO T2 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T2", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte) 2, (short) 2, 10000000L, Float.valueOf(1.11f), Double.valueOf(1.21d), LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"}), Row.of(new Object[]{1, 3, 6, new BigDecimal("10.00"), (byte) 2, (short) 2, 10000000L, Float.valueOf(1.11f), Double.valueOf(1.21d), LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T2").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$MergeMapAggregationITCase.class */
    public static class MergeMapAggregationITCase extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE test_merge_map(  id INT PRIMARY KEY NOT ENFORCED,  f0 MAP<INT, STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'merge_map')");
        }

        @Test
        public void testMergeMap() {
            sql("INSERT INTO test_merge_map VALUES (1, CAST (NULL AS MAP<INT, STRING>)), (2, MAP[1, 'A']), (3, MAP[1, 'A', 2, 'B'])", new Object[0]);
            List<Row> queryAndSort = queryAndSort("SELECT * FROM test_merge_map");
            checkOneRecord(queryAndSort.get(0), 1, null);
            checkOneRecord(queryAndSort.get(1), 2, toMap(1, "A"));
            checkOneRecord(queryAndSort.get(2), 3, toMap(1, "A", 2, "B"));
            sql("INSERT INTO test_merge_map VALUES (1, MAP[1, 'A']), (2, MAP[1, 'B']), (3, MAP[1, 'a', 2, 'b', 3, 'c'])", new Object[0]);
            List<Row> queryAndSort2 = queryAndSort("SELECT * FROM test_merge_map");
            checkOneRecord(queryAndSort2.get(0), 1, toMap(1, "A"));
            checkOneRecord(queryAndSort2.get(1), 2, toMap(1, "B"));
            checkOneRecord(queryAndSort2.get(2), 3, toMap(1, "a", 2, "b", 3, "c"));
        }

        private Map<Object, Object> toMap(Object... objArr) {
            HashMap hashMap = new HashMap();
            for (int i = 0; i < objArr.length; i += 2) {
                hashMap.put(objArr[i], objArr[i + 1]);
            }
            return hashMap;
        }

        private void checkOneRecord(Row row, int i, Map<Object, Object> map) {
            Assertions.assertThat(row.getField(0)).isEqualTo(Integer.valueOf(i));
            if (map == null || map.isEmpty()) {
                Assertions.assertThat(row.getField(1)).isNull();
            } else {
                Assertions.assertThat((Map) row.getField(1)).containsExactlyInAnyOrderEntriesOf(map);
            }
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$MinAggregation.class */
    public static class MinAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,i DATE,l TIMESTAMP,m CHAR(1),n VARCHAR,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='min', 'fields.b.aggregate-function'='min', 'fields.c.aggregate-function'='min', 'fields.d.aggregate-function'='min', 'fields.e.aggregate-function'='min', 'fields.f.aggregate-function'='min','fields.h.aggregate-function'='min','fields.i.aggregate-function'='min','fields.l.aggregate-function'='min','fields.m.aggregate-function'='min','fields.n.aggregate-function'='min');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte) -1, (short) -1, 1000L, Float.valueOf(-1.11f), Double.valueOf(-1.11d), LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte) -1, (short) -1, 1000L, Float.valueOf(-1.11f), Double.valueOf(-1.11d), LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T3 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            batchSql("INSERT INTO T3 VALUES (1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte) -1, (short) -1, 1000L, Float.valueOf(-1.11f), Double.valueOf(-1.11d), LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"}), Row.of(new Object[]{1, 3, 3, new BigDecimal("1.01"), (byte) -1, (short) -1, 1000L, Float.valueOf(-1.11f), Double.valueOf(-1.11d), LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T3").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$NestedUpdateAggregationITCase.class */
    public static class NestedUpdateAggregationITCase extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Arrays.asList("CREATE TABLE orders (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING\n);", "CREATE TABLE sub_orders (\n  order_id INT,\n  daily_id INT,\n  today STRING,\n  product_name STRING,\n  price BIGINT,\n  PRIMARY KEY (order_id, daily_id, today) NOT ENFORCED\n);", "CREATE TABLE order_wide (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING,\n  sub_orders ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>\n) WITH (\n  'merge-engine' = 'aggregation',\n  'fields.sub_orders.aggregate-function' = 'nested_update',\n  'fields.sub_orders.nested-key' = 'daily_id,today',\n  'fields.sub_orders.ignore-retract' = 'true',  'fields.user_name.ignore-retract' = 'true',  'fields.address.ignore-retract' = 'true')", "CREATE TABLE order_append_wide (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING,\n  sub_orders ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>\n) WITH (\n  'merge-engine' = 'aggregation',\n  'fields.sub_orders.aggregate-function' = 'nested_update',\n  'fields.sub_orders.ignore-retract' = 'true',  'fields.user_name.ignore-retract' = 'true',  'fields.address.ignore-retract' = 'true')");
        }

        @Test
        public void testUseCase() {
            sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou'),(2, 'Zhao', 'ChengDu'),(3, 'Liu', 'NanJing')", new Object[0]);
            sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(1, 2, '12-20', 'Tesla', 400000),(1, 1, '12-21', 'Sangsung', 5000),(2, 1, '12-20', 'Tea', 40),(2, 2, '12-20', 'Pot', 60),(3, 1, '12-25', 'Bat', 15),(3, 1, '12-26', 'Cup', 30)", new Object[0]);
            sql(widenSql(), new Object[0]);
            List list = (List) sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(row -> {
                return ((Integer) row.getFieldAs(0)).intValue();
            })).collect(Collectors.toList());
            Assertions.assertThat(checkOneRecord((Row) list.get(0), 1, "Wang", "HangZhou", Row.of(new Object[]{1, "12-20", "Apple", 8000L}), Row.of(new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of(new Object[]{2, "12-20", "Tesla", 400000L}))).isTrue();
            Assertions.assertThat(checkOneRecord((Row) list.get(1), 2, "Zhao", "ChengDu", Row.of(new Object[]{1, "12-20", "Tea", 40L}), Row.of(new Object[]{2, "12-20", "Pot", 60L}))).isTrue();
            Assertions.assertThat(checkOneRecord((Row) list.get(2), 3, "Liu", "NanJing", Row.of(new Object[]{1, "12-25", "Bat", 15L}), Row.of(new Object[]{1, "12-26", "Cup", 30L}))).isTrue();
            Assertions.assertThat(sql("SELECT order_id, user_name, address, daily_id, today, product_name, price FROM order_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L}), Row.of(new Object[]{1, "Wang", "HangZhou", 2, "12-20", "Tesla", 400000L}), Row.of(new Object[]{1, "Wang", "HangZhou", 1, "12-21", "Sangsung", 5000L}), Row.of(new Object[]{2, "Zhao", "ChengDu", 1, "12-20", "Tea", 40L}), Row.of(new Object[]{2, "Zhao", "ChengDu", 2, "12-20", "Pot", 60L}), Row.of(new Object[]{3, "Liu", "NanJing", 1, "12-25", "Bat", 15L}), Row.of(new Object[]{3, "Liu", "NanJing", 1, "12-26", "Cup", 30L})});
        }

        @Test
        public void testUseCaseAppend() {
            sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou'),(2, 'Zhao', 'ChengDu'),(3, 'Liu', 'NanJing')", new Object[0]);
            sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(2, 1, '12-20', 'Tesla', 400000),(3, 1, '12-25', 'Bat', 15),(3, 1, '12-26', 'Cup', 30)", new Object[0]);
            sql(widenAppendSql(), new Object[0]);
            Assertions.assertThat(sql("SELECT order_id, user_name, address, daily_id, today, product_name, price FROM order_append_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L}), Row.of(new Object[]{2, "Zhao", "ChengDu", 1, "12-20", "Tesla", 400000L}), Row.of(new Object[]{3, "Liu", "NanJing", 1, "12-25", "Bat", 15L}), Row.of(new Object[]{3, "Liu", "NanJing", 1, "12-26", "Cup", 30L})});
        }

        @Timeout(60)
        @Test
        public void testUpdateWithIgnoreRetract() throws Exception {
            List<Row> sql;
            List<Row> sql2;
            this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, ExecutionConfigOptions.UpsertMaterialize.NONE);
            sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou')", new Object[0]);
            sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(1, 2, '12-20', 'Tesla', 400000),(1, 1, '12-21', 'Sangsung', 5000)", new Object[0]);
            this.sEnv.executeSql(widenSql());
            do {
                Thread.sleep(500L);
                sql = sql("SELECT * FROM order_wide", new Object[0]);
            } while (!(!sql.isEmpty() && checkOneRecord(sql.get(0), 1, "Wang", "HangZhou", Row.of(new Object[]{1, "12-20", "Apple", 8000L}), Row.of(new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of(new Object[]{2, "12-20", "Tesla", 400000L}))));
            sql("INSERT INTO sub_orders VALUES (1, 2, '12-20', 'Benz', 380000)", new Object[0]);
            do {
                Thread.sleep(500L);
                sql2 = sql("SELECT * FROM order_wide", new Object[0]);
            } while (!(!sql2.isEmpty() && checkOneRecord(sql2.get(0), 1, "Wang", "HangZhou", Row.of(new Object[]{1, "12-20", "Apple", 8000L}), Row.of(new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of(new Object[]{2, "12-20", "Benz", 380000L}))));
        }

        private String widenSql() {
            return "INSERT INTO order_wide\nSELECT order_id, user_name, address, CAST (NULL AS ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>) FROM orders\nUNION ALL\nSELECT order_id, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[ROW(daily_id, today, product_name, price)] FROM sub_orders";
        }

        private String widenAppendSql() {
            return "INSERT INTO order_append_wide\nSELECT order_id, user_name, address, CAST (NULL AS ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>) FROM orders\nUNION ALL\nSELECT order_id, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[ROW(daily_id, today, product_name, price)] FROM sub_orders";
        }

        private boolean checkOneRecord(Row row, int i, String str, String str2, Row... rowArr) {
            if (((Integer) row.getField(0)).intValue() == i && row.getFieldAs(1).equals(str) && row.getFieldAs(2).equals(str2)) {
                return checkNestedTable((Row[]) row.getFieldAs(3), rowArr);
            }
            return false;
        }

        private boolean checkNestedTable(Row[] rowArr, Row... rowArr2) {
            if (rowArr.length != rowArr2.length) {
                return false;
            }
            Comparator thenComparing = Comparator.comparingInt(obj -> {
                return ((Integer) ((Row) obj).getFieldAs(0)).intValue();
            }).thenComparing(obj2 -> {
                return (String) ((Row) obj2).getField(1);
            });
            List list = (List) Arrays.stream(rowArr).sorted(thenComparing).collect(Collectors.toList());
            List list2 = (List) Arrays.stream(rowArr2).sorted(thenComparing).collect(Collectors.toList());
            for (int i = 0; i < list.size(); i++) {
                if (!((Row) list.get(i)).equals(list2.get(i))) {
                    return false;
                }
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$ProductAggregation.class */
    public static class ProductAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T1 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='product', 'fields.b.aggregate-function'='product', 'fields.c.aggregate-function'='product', 'fields.d.aggregate-function'='product', 'fields.e.aggregate-function'='product', 'fields.f.aggregate-function'='product','fields.h.aggregate-function'='product');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("ALTER TABLE T1 MODIFY b DECIMAL(5, 3)", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE)),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS DOUBLE)), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 6, new BigDecimal("11.110"), (byte) 2, (short) -2, 1000000000000000L, Float.valueOf(-0.0f), Double.valueOf(-1.3676310000000003d)})});
            Assertions.assertThat(batchSql("SELECT f,e FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{Float.valueOf(-0.0f), 1000000000000000L})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 6, new BigDecimal("11.10"), (byte) 2, (short) -2, 1000000000000000L, Float.valueOf(-1.2321f), Double.valueOf(-1.3676310000000003d)})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 6, new BigDecimal("11.10"), (byte) 2, (short) -2, 1000000000000000L, Float.valueOf(-1.2321f), Double.valueOf(-1.3676310000000003d)}), Row.of(new Object[]{1, 3, 12, new BigDecimal("11.10"), (byte) 2, (short) -2, 1000000000000000L, Float.valueOf(-1.2321f), Double.valueOf(-1.3676310000000003d)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T1").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$SumAggregation.class */
    public static class SumAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T1 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='sum', 'fields.b.aggregate-function'='sum', 'fields.c.aggregate-function'='sum', 'fields.d.aggregate-function'='sum', 'fields.e.aggregate-function'='sum', 'fields.f.aggregate-function'='sum','fields.h.aggregate-function'='sum');");
        }

        @Test
        public void testMergeInMemory() {
            batchSql("INSERT INTO T1 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE)),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS DOUBLE)), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 5, new BigDecimal("12.11"), (byte) 4, (short) 2, 10101000L, Float.valueOf(0.0f), Double.valueOf(1.11d)})});
            Assertions.assertThat(batchSql("SELECT f,e FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{Float.valueOf(0.0f), 10101000L})});
        }

        @Test
        public void testMergeRead() {
            batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 6, new BigDecimal("12.11"), (byte) 4, (short) 2, 10101000L, Float.valueOf(0.0f), Double.valueOf(1.11d)})});
        }

        @Test
        public void testMergeCompaction() {
            batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            batchSql("INSERT INTO T1 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 6, new BigDecimal("12.11"), (byte) 4, (short) 2, 10101000L, Float.valueOf(0.0f), Double.valueOf(1.11d)}), Row.of(new Object[]{1, 3, 7, new BigDecimal("12.11"), (byte) 4, (short) 2, 10101000L, Float.valueOf(0.0f), Double.valueOf(1.11d)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> {
                this.sEnv.from("T1").execute().print();
            }, "Pre-aggregate continuous reading is not supported", new Object[0]);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/PreAggregationITCase$SumRetractionAggregation.class */
    public static class SumRetractionAggregation extends CatalogITCaseBase {
        @Override // org.apache.paimon.flink.CatalogITCaseBase
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE T (k INT,b Decimal(12, 2),PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'changelog-producer' = 'full-compaction','fields.b.aggregate-function'='sum');");
        }

        @Test
        public void testRetraction() throws Exception {
            sql("CREATE TABLE INPUT (k INT,b INT,PRIMARY KEY (k) NOT ENFORCED);", new Object[0]);
            CloseableIterator<Row> streamSqlIter = streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT GROUP BY k;", new Object[0]);
            BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM T", new Object[0]);
            sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)", new Object[0]);
            Assertions.assertThat(streamSqlBlockIter.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, BigDecimal.valueOf(100L, 2)}), Row.of(new Object[]{2, BigDecimal.valueOf(200L, 2)})});
            sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)", new Object[0]);
            Assertions.assertThat(streamSqlBlockIter.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, BigDecimal.valueOf(100L, 2)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, BigDecimal.valueOf(300L, 2)}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2, BigDecimal.valueOf(200L, 2)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, BigDecimal.valueOf(400L, 2)})});
            streamSqlBlockIter.close();
            streamSqlIter.close();
        }
    }
}
