package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.MatchError;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;

/* compiled from: ChangelogSourceITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001a4A!\u0001\u0002\u0001'\t)2\t[1oO\u0016dwnZ*pkJ\u001cW-\u0013+DCN,'BA\u0002\u0005\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u001d\u0011XO\u001c;j[\u0016T!!\u0003\u0006\u0002\u000fAd\u0017M\u001c8fe*\u00111\u0002D\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u001b9\tQA\u001a7j].T!a\u0004\t\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0012aA8sO\u000e\u00011C\u0001\u0001\u0015!\t)\u0002$D\u0001\u0017\u0015\t9b!A\u0003vi&d7/\u0003\u0002\u001a-\tQ2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK\"A1\u0004\u0001B\u0001B\u0003%A$A\u0003ti\u0006$X\r\u0005\u0002\u001ec9\u0011ad\f\b\u0003?9r!\u0001I\u0017\u000f\u0005\u0005bcB\u0001\u0012,\u001d\t\u0019#F\u0004\u0002%S9\u0011Q\u0005K\u0007\u0002M)\u0011qEE\u0001\u0007yI|w\u000e\u001e \n\u0003EI!a\u0004\t\n\u00055q\u0011BA\u0006\r\u0013\tI!\"\u0003\u0002\b\u0011%\u0011qCB\u0005\u0003aY\t!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016L!AM\u001a\u0003!M#\u0018\r^3CC\u000e\\WM\u001c3N_\u0012,'B\u0001\u0019\u0017\u0011\u0015)\u0004\u0001\"\u00017\u0003\u0019a\u0014N\\5u}Q\u0011q'\u000f\t\u0003q\u0001i\u0011A\u0001\u0005\u00067Q\u0002\r\u0001\b\u0005\bw\u0001\u0011\r\u0011\"\u0001=\u0003\u0019!\u0017\r^1JIV\tQ\b\u0005\u0002?\t:\u0011qH\u0011\t\u0003K\u0001S\u0011!Q\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0007\u0002\u000ba\u0001\u0015:fI\u00164\u0017BA#G\u0005\u0019\u0019FO]5oO*\u00111\t\u0011\u0005\u0007\u0011\u0002\u0001\u000b\u0011B\u001f\u0002\u000f\u0011\fG/Y%eA!)!\n\u0001C!\u0017\u00061!-\u001a4pe\u0016$\u0012\u0001\u0014\t\u0003\u001b:k\u0011\u0001Q\u0005\u0003\u001f\u0002\u0013A!\u00168ji\"\u0012\u0011*\u0015\t\u0003%Vk\u0011a\u0015\u0006\u0003)B\tQA[;oSRL!AV*\u0003\r\t+gm\u001c:f\u0011\u0015A\u0006\u0001\"\u0001L\u0003\u0015\"Xm\u001d;DQ\u0006tw-\u001a7pON{WO]2f\u0003:$Gk\u001c*fiJ\f7\r^*ue\u0016\fW\u000e\u000b\u0002X5B\u0011!kW\u0005\u00039N\u0013A\u0001V3ti\")a\f\u0001C\u0001\u0017\u0006\u0001C/Z:u\u0007\"\fgnZ3m_\u001e\u001cv.\u001e:dK\u0006sG-\u00169tKJ$8+\u001b8lQ\ti&\fC\u0003b\u0001\u0011\u00051*\u0001\u0010uKN$\u0018iZ4sK\u001e\fG/Z(o\u0007\"\fgnZ3m_\u001e\u001cv.\u001e:dK\"\u0012\u0001M\u0017\u0005\u0006I\u0002!\taS\u0001,i\u0016\u001cH/Q4he\u0016<\u0017\r^3P]\u000eC\u0017M\\4fY><7k\\;sG\u0016\fe\u000eZ+qg\u0016\u0014HoU5oW\"\u00121M\u0017\u0005\u0006O\u0002!\taS\u0001+i\u0016\u001cH/Q4he\u0016<\u0017\r^3P]&s7/\u001a:u\t\u0016dW\r^3DQ\u0006tw-\u001a7pON{WO]2fQ\t1'\f\u000b\u0003\u0001UB\f\bCA6o\u001b\u0005a'BA7T\u0003\u0019\u0011XO\u001c8fe&\u0011q\u000e\u001c\u0002\b%Vtw+\u001b;i\u0003\u00151\u0018\r\\;fG\u0005\u0011\bCA:w\u001b\u0005!(BA;T\u0003\u001d\u0011XO\u001c8feNL!a\u001e;\u0003\u001bA\u000b'/Y7fi\u0016\u0014\u0018N_3e\u0001")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.class */
public class ChangelogSourceITCase extends StreamingWithStateTestBase {
    private final String dataId;

    public String dataId() {
        return this.dataId;
    }

    @Override // org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase, org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @Before
    public void before() {
        super.before();
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(336).append("\n         |CREATE TABLE user_logs (\n         |  user_id STRING,\n         |  user_name STRING,\n         |  email STRING,\n         |  balance DECIMAL(18,2),\n         |  balance2 AS balance * 2\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append(dataId()).append("',\n         | 'changelog-mode' = 'I,UA,UB,D'\n         |)\n         |").toString())).stripMargin());
    }

    @Test
    public void testChangelogSourceAndToRetractStream() {
        DataStream retractStream = package$.MODULE$.tableConversions(tEnv().sqlQuery("SELECT * FROM user_logs")).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        retractStream.addSink(testingRetractSink).setParallelism(retractStream.parallelism());
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"user1,Tom,tom123@gmail.com,8.10,16.20", "user3,Bailey,bailey@qq.com,9.99,19.98", "user4,Tina,tina@gmail.com,11.30,22.60"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testChangelogSourceAndUpsertSink() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE user_sink (\n         |  user_id STRING PRIMARY KEY NOT ENFORCED,\n         |  user_name STRING,\n         |  email STRING,\n         |  balance DECIMAL(18,2),\n         |  balance2 DECIMAL(18,2)\n         |) WITH (\n         | 'connector' = 'values',\n         | 'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin();
        String stripMargin2 = new StringOps(Predef$.MODULE$.augmentString("\n         |INSERT INTO user_sink\n         |SELECT * FROM user_logs\n         |")).stripMargin();
        tEnv().executeSql(stripMargin);
        execInsertSqlAndWaitResult(stripMargin2);
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"user1,Tom,tom123@gmail.com,8.10,16.20", "user3,Bailey,bailey@qq.com,9.99,19.98", "user4,Tina,tina@gmail.com,11.30,22.60"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("user_sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAggregateOnChangelogSource() {
        DataStream retractStream = package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT count(*), sum(balance), max(email)\n         |FROM user_logs\n         |")).stripMargin())).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        retractStream.addSink(testingRetractSink).setParallelism(retractStream.parallelism());
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"3,29.39,tom123@gmail.com"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAggregateOnChangelogSourceAndUpsertSink() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE user_sink (\n         |  `scope` STRING,\n         |  cnt BIGINT,\n         |  sum_balance DECIMAL(18,2),\n         |  max_email STRING,\n         |  PRIMARY KEY (`scope`) NOT ENFORCED\n         |) WITH (\n         | 'connector' = 'values',\n         | 'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin();
        String stripMargin2 = new StringOps(Predef$.MODULE$.augmentString("\n         |INSERT INTO user_sink\n         |SELECT 'ALL', count(*), sum(balance), max(email)\n         |FROM user_logs\n         |GROUP BY 'ALL'\n         |")).stripMargin();
        tEnv().executeSql(stripMargin);
        execInsertSqlAndWaitResult(stripMargin2);
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"ALL,3,29.39,tom123@gmail.com"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("user_sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAggregateOnInsertDeleteChangelogSource() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(294).append("\n         |CREATE TABLE user_logs2 (\n         |  user_id STRING,\n         |  user_name STRING,\n         |  email STRING,\n         |  balance DECIMAL(18,2)\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) TestData$.MODULE$.userChangelog().map(row -> {
            Row row;
            RowKind kind = row.getKind();
            if (RowKind.INSERT.equals(kind) ? true : RowKind.DELETE.equals(kind)) {
                row = row;
            } else if (RowKind.UPDATE_BEFORE.equals(kind)) {
                Row copy = Row.copy(row);
                copy.setKind(RowKind.DELETE);
                row = copy;
            } else {
                if (!RowKind.UPDATE_AFTER.equals(kind)) {
                    throw new MatchError(kind);
                }
                Row copy2 = Row.copy(row);
                copy2.setKind(RowKind.INSERT);
                row = copy2;
            }
            return row;
        }, Seq$.MODULE$.canBuildFrom()))).append("',\n         | 'changelog-mode' = 'I,D'\n         |)\n         |").toString())).stripMargin());
        DataStream retractStream = package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT count(*), sum(balance), max(email)\n         |FROM user_logs2\n         |")).stripMargin())).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        retractStream.addSink(testingRetractSink).setParallelism(retractStream.parallelism());
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"3,29.39,tom123@gmail.com"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    public ChangelogSourceITCase(StreamingWithStateTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode);
        this.dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.userChangelog());
    }
}
