package org.apache.flink.table.planner.runtime.harness;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: WindowAggregateHarnessTest.scala */
@ExtendWith({ParameterizedTestExtension.class})
@ScalaSignature(bytes = "\u0006\u0001\t%b\u0001B\u0001\u0003\u0001E\u0011!dV5oI><\u0018iZ4sK\u001e\fG/\u001a%be:,7o\u001d+fgRT!a\u0001\u0003\u0002\u000f!\f'O\\3tg*\u0011QAB\u0001\beVtG/[7f\u0015\t9\u0001\"A\u0004qY\u0006tg.\u001a:\u000b\u0005%Q\u0011!\u0002;bE2,'BA\u0006\r\u0003\u00151G.\u001b8l\u0015\tia\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001f\u0005\u0019qN]4\u0004\u0001M\u0011\u0001A\u0005\t\u0003'Qi\u0011AA\u0005\u0003+\t\u0011q\u0002S1s]\u0016\u001c8\u000fV3ti\n\u000b7/\u001a\u0005\t/\u0001\u0011\t\u0011)A\u00051\u00059!-Y2lK:$\u0007CA\r0\u001d\tQBF\u0004\u0002\u001cU9\u0011A$\u000b\b\u0003;!r!AH\u0014\u000f\u0005}1cB\u0001\u0011&\u001d\t\tC%D\u0001#\u0015\t\u0019\u0003#\u0001\u0004=e>|GOP\u0005\u0002\u001f%\u0011QBD\u0005\u0003\u00171I!!\u0003\u0006\n\u0005\u001dA\u0011BA\u0003\u0007\u0013\tYC!A\u0003vi&d7/\u0003\u0002.]\u0005Q2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK*\u00111\u0006B\u0005\u0003aE\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u00055r\u0003\u0002C\u001a\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001b\u0002\u001bMD\u0017N\u001a;US6,'l\u001c8f!\t)$(D\u00017\u0015\t9\u0004(\u0001\u0003uS6,'\"A\u001d\u0002\t)\fg/Y\u0005\u0003wY\u0012aAW8oK&#\u0007\"B\u001f\u0001\t\u0003q\u0014A\u0002\u001fj]&$h\bF\u0002@\u0001\u0006\u0003\"a\u0005\u0001\t\u000b]a\u0004\u0019\u0001\r\t\u000bMb\u0004\u0019\u0001\u001b\t\u000f\r\u0003!\u0019!C\u0005\t\u0006YQ\u000bV\"`5>sUiX%E+\u0005!\u0004B\u0002$\u0001A\u0003%A'\u0001\u0007V)\u000e{&l\u0014(F?&#\u0005\u0005C\u0003I\u0001\u0011\u0005\u0013*\u0001\u0004cK\u001a|'/\u001a\u000b\u0002\u0015B\u00111JT\u0007\u0002\u0019*\tQ*A\u0003tG\u0006d\u0017-\u0003\u0002P\u0019\n!QK\\5uQ\t9\u0015\u000b\u0005\u0002S36\t1K\u0003\u0002U+\u0006\u0019\u0011\r]5\u000b\u0005Y;\u0016a\u00026va&$XM\u001d\u0006\u00031:\tQA[;oSRL!AW*\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007\u000eC\u0003]\u0001\u0011\u0005\u0011*\u0001\u0010uKN$\bK]8dKN\u001c\u0018N\\4US6,G+^7cY\u0016<\u0016N\u001c3po\"\u00121L\u0018\t\u0003%~K!\u0001Y*\u0003\u0019Q+7\u000f\u001e+f[Bd\u0017\r^3\t\u000b\t\u0004A\u0011A%\u0002WQ,7\u000f\u001e)s_\u000e,7o]5oORKW.\u001a+v[\ndWmV5oI><x+\u001b;i\u0007\u0012\u001b5k\\;sG\u0016D#!\u00190\t\u000b\u0015\u0004A\u0011A%\u00027Q,7\u000f\u001e)s_\u000e,7o]5oORKW.\u001a%pa^Kg\u000eZ8xQ\t!g\fC\u0003i\u0001\u0011\u0005\u0011*\u0001\u0015uKN$\bK]8dKN\u001c\u0018N\\4US6,\u0007j\u001c9XS:$wn^,ji\"\u001cEiQ*pkJ\u001cW\r\u000b\u0002h=\")1\u000e\u0001C\u0001\u0013\u0006\u0001C/Z:u!J|7-Z:tS:<G+[7f\u0007VlW\u000f\\1uK^Kg\u000eZ8xQ\tQg\fC\u0003o\u0001\u0011\u0005\u0011*A\u0017uKN$\bK]8dKN\u001c\u0018N\\4US6,7)^7vY\u0006$XmV5oI><x+\u001b;i\u0007\u0012\u001b5k\\;sG\u0016D#!\u001c0\t\u000bE\u0004A\u0011A%\u0002)Q,7\u000f^\"m_N,w+\u001b;i_V$x\n]3oQ\t\u0001h\fC\u0003u\u0001\u0011\u0005\u0011*A\u0016uKN$Hk^8QQ\u0006\u001cXmV5oI><\u0018iZ4sK\u001e\fG/Z\"m_N,w+\u001b;i_V$x\n]3oQ\t\u0019h\fC\u0003x\u0001\u0011\u0005\u0011*A\u0019uKN$\bK]8dKN\u001c\u0018N\\4US6,G+^7cY\u0016<\u0016N\u001c3po^KG\u000f\u001b$viV\u0014XmV1uKJl\u0017M]6)\u0005Yt\u0006\"\u0002>\u0001\t\u0013Y\u0018AI2sK\u0006$X\r\u0015:pG\u0016\u001c8/\u001b8h)&lWmV5oI><x\n]3sCR|'\u000fF\u0003}\u0003c\t)\u0005E\u0003L{~\fY\"\u0003\u0002\u007f\u0019\n1A+\u001e9mKJ\u0002\"\"!\u0001\u0002\f\u0005=\u0011qBA\b\u001b\t\t\u0019A\u0003\u0003\u0002\u0006\u0005\u001d\u0011\u0001B;uS2T1!!\u0003\u000b\u0003%\u0019HO]3b[&tw-\u0003\u0003\u0002\u000e\u0005\r!AJ&fs\u0016$wJ\\3J]B,Ho\u0015;sK\u0006lw\n]3sCR|'\u000fV3ti\"\u000b'O\\3tgB!\u0011\u0011CA\f\u001b\t\t\u0019BC\u0002\u0002\u0016!\tA\u0001Z1uC&!\u0011\u0011DA\n\u0005\u001d\u0011vn\u001e#bi\u0006\u0004RaSA\u000f\u0003CI1!a\bM\u0005\u0015\t%O]1z!\u0011\t\u0019#!\f\u000e\u0005\u0005\u0015\"\u0002BA\u0014\u0003S\tq\u0001\\8hS\u000e\fGNC\u0002\u0002,!\tQ\u0001^=qKNLA!a\f\u0002&\tYAj\\4jG\u0006dG+\u001f9f\u0011\u001d\t\u0019$\u001fa\u0001\u0003k\t!\u0002^3ti^Kg\u000eZ8x!\u0011\t9$a\u0010\u000f\t\u0005e\u00121\b\t\u0003C1K1!!\u0010M\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011IA\"\u0005\u0019\u0019FO]5oO*\u0019\u0011Q\b'\t\u000f\u0005\u001d\u0013\u00101\u0001\u0002J\u0005Y\u0011n]\"E\u0007N{WO]2f!\rY\u00151J\u0005\u0004\u0003\u001bb%a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003#\u0002A\u0011BA*\u0003)IgnZ3ti\u0012\u000bG/\u0019\u000b\u0006\u0015\u0006U\u0013\u0011\f\u0005\b\u0003/\ny\u00051\u0001��\u0003-!Xm\u001d;ICJtWm]:\t\u0011\u0005\u001d\u0013q\na\u0001\u0003\u0013Bq!!\u0018\u0001\t\u0013\ty&\u0001\u0007j]N,'\u000f\u001e*fG>\u0014H\r\u0006\u0003\u0002b\u0005=\u0004CBA2\u0003W\ny!\u0004\u0002\u0002f)!\u0011qMA5\u00031\u0019HO]3b[J,7m\u001c:e\u0015\r)\u0011qA\u0005\u0005\u0003[\n)G\u0001\u0007TiJ,\u0017-\u001c*fG>\u0014H\r\u0003\u0005\u0002r\u0005m\u0003\u0019AA:\u0003\u0011\t'oZ:\u0011\u000b-\u000b)(!\u001f\n\u0007\u0005]DJ\u0001\u0006=e\u0016\u0004X-\u0019;fIz\u00022aSA>\u0013\r\ti\b\u0014\u0002\u0004\u0003:L\bbBAA\u0001\u0011%\u00111Q\u0001\u0010G\"\fgnZ3m_\u001e\u0014VmY8sIR1\u0011\u0011MAC\u0003'C\u0001\"a\"\u0002��\u0001\u0007\u0011\u0011R\u0001\be><8*\u001b8e!\u0011\tY)a$\u000e\u0005\u00055%bAA\u0016\u0015%!\u0011\u0011SAG\u0005\u001d\u0011vn^&j]\u0012D\u0001\"!\u001d\u0002��\u0001\u0007\u00111\u000f\u0005\b\u0003/\u0003A\u0011BAM\u0003)awnY1m\u001b&dGn\u001d\u000b\u0005\u00037\u000b\t\u000b\u0005\u0003\u0002\u0012\u0005u\u0015\u0002BAP\u0003'\u0011Q\u0002V5nKN$\u0018-\u001c9ECR\f\u0007\u0002CAR\u0003+\u0003\r!!\u000e\u0002\u0011\u0011\fG/\u001a+j[\u0016Ds\u0001AAT\u0003g\u000b)\f\u0005\u0003\u0002*\u0006=VBAAV\u0015\r\tikU\u0001\nKb$XM\\:j_:LA!!-\u0002,\nQQ\t\u001f;f]\u0012<\u0016\u000e\u001e5\u0002\u000bY\fG.^3-\u0005\u0005]6EAA]!\u0011\tY,a3\u000e\u0005\u0005u&\u0002BA`\u0003\u0003\fQ\u0002]1sC6,G/\u001a:ju\u0016$'\u0002BAb\u0003\u000b\f!\"\u001a=uK:\u001c\u0018n\u001c8t\u0015\rA\u0016q\u0019\u0006\u0004\u0003\u0013T\u0011!\u0003;fgR,H/\u001b7t\u0013\u0011\ti-!0\u00035A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0012=uK:\u001c\u0018n\u001c8\b\u000f\u0005E'\u0001#\u0001\u0002T\u0006Qr+\u001b8e_^\fum\u001a:fO\u0006$X\rS1s]\u0016\u001c8\u000fV3tiB\u00191#!6\u0007\r\u0005\u0011\u0001\u0012AAl'\u0011\t).!7\u0011\u0007-\u000bY.C\u0002\u0002^2\u0013a!\u00118z%\u00164\u0007bB\u001f\u0002V\u0012\u0005\u0011\u0011\u001d\u000b\u0003\u0003'D\u0001\"!:\u0002V\u0012\u0005\u0011q]\u0001\u000ba\u0006\u0014\u0018-\\3uKJ\u001cHCAAu!\u0019\tY/a<\u0002t6\u0011\u0011Q\u001e\u0006\u0004\u0003\u000bA\u0014\u0002BAy\u0003[\u0014!bQ8mY\u0016\u001cG/[8o!\u0015Y\u0015QDA{!\u0011\t90!@\u000e\u0005\u0005e(bAA~q\u0005!A.\u00198h\u0013\u0011\ty0!?\u0003\r=\u0013'.Z2uQ!\t\u0019Oa\u0001\u0003\n\t-\u0001\u0003BA^\u0005\u000bIAAa\u0002\u0002>\nQ\u0001+\u0019:b[\u0016$XM]:\u0002\t9\fW.Z\u0011\u0003\u0005\u001b\tad\u0015;bi\u0016\u0014\u0015mY6f]\u0012l4\u0010M?-AQKW.\u001a.p]\u0016l40M?\t\u0015\tE\u0011Q\u001bb\u0001\n\u0003\u0011\u0019\"\u0001\u0004U+6\u0013E*R\u000b\u0003\u0003kA\u0011Ba\u0006\u0002V\u0002\u0006I!!\u000e\u0002\u000fQ+VJ\u0011'FA!Q!1DAk\u0005\u0004%\tAa\u0005\u0002\u0007!{\u0005\u000bC\u0005\u0003 \u0005U\u0007\u0015!\u0003\u00026\u0005!\u0001j\u0014)!\u0011)\u0011\u0019#!6C\u0002\u0013\u0005!1C\u0001\t\u0007VkU\u000bT!U\u000b\"I!qEAkA\u0003%\u0011QG\u0001\n\u0007VkU\u000bT!U\u000b\u0002\u0002")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.class */
public class WindowAggregateHarnessTest extends HarnessTestBase {
    private final ZoneId shiftTimeZone;
    private final ZoneId UTC_ZONE_ID;

    public static String CUMULATE() {
        return WindowAggregateHarnessTest$.MODULE$.CUMULATE();
    }

    public static String HOP() {
        return WindowAggregateHarnessTest$.MODULE$.HOP();
    }

    public static String TUMBLE() {
        return WindowAggregateHarnessTest$.MODULE$.TUMBLE();
    }

    @Parameters(name = "StateBackend={0}, TimeZone={1}")
    public static Collection<Object[]> parameters() {
        return WindowAggregateHarnessTest$.MODULE$.parameters();
    }

    private ZoneId UTC_ZONE_ID() {
        return this.UTC_ZONE_ID;
    }

    @Override // org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @BeforeEach
    public void before() {
        super.before();
        tEnv().getConfig().setLocalTimeZone(this.shiftTimeZone);
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(551).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | proctime AS PROCTIME()\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp())).append("'\n                       |)\n                       |").toString())).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(612).append("\n                       |CREATE TABLE T1_CDC (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | proctime AS PROCTIME()\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(TestValuesTableFactory.registerData(TestData$.MODULE$.windowChangelogDataWithTimestamp())).append("',\n                       | 'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    @TestTemplate
    public void testProcessingTimeTumbleWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), false);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        ingestData(keyedOneInputStreamOperatorTestHarness, false);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(1L), null, BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(7.0d), BoxesRunTime.boxToLong(0L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testProcessingTimeTumbleWindowWithCDCSource() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), true);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        ingestData(keyedOneInputStreamOperatorTestHarness, true);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToDouble(22.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(1L), null, BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testProcessingTimeHopWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.HOP(), false);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        ingestData(keyedOneInputStreamOperatorTestHarness, false);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToLong(2L), localMills("1969-12-31T23:59:55"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(5L), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToLong(3L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(1L), null, BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:10"), localMills("1970-01-01T00:00:20")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:25")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:25"), localMills("1970-01-01T00:00:35")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:40")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(7.0d), BoxesRunTime.boxToLong(0L), localMills("1970-01-01T00:00:25"), localMills("1970-01-01T00:00:35")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(7.0d), BoxesRunTime.boxToLong(0L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:40")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testProcessingTimeHopWindowWithCDCSource() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.HOP(), true);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        ingestData(keyedOneInputStreamOperatorTestHarness, true);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToDouble(22.0d), BoxesRunTime.boxToLong(2L), localMills("1969-12-31T23:59:55"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(22.0d), BoxesRunTime.boxToLong(3L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(1L), null, BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:10"), localMills("1970-01-01T00:00:20")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:25")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testProcessingTimeCumulateWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.CUMULATE(), false);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        ingestData(keyedOneInputStreamOperatorTestHarness, false);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(5L), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToLong(3L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(5L), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToLong(3L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:25")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:30")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:40")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:45")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(7.0d), BoxesRunTime.boxToLong(0L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(7.0d), BoxesRunTime.boxToLong(0L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:40")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(7.0d), BoxesRunTime.boxToLong(0L), localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:45")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testProcessingTimeCumulateWindowWithCDCSource() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.CUMULATE(), true);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        ingestData(keyedOneInputStreamOperatorTestHarness, true);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToDouble(22.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(22.0d), BoxesRunTime.boxToLong(3L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(22.0d), BoxesRunTime.boxToLong(3L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:10")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(6.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:15")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:25")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:30")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testCloseWithoutOpen() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), false);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        keyedOneInputStreamOperatorTestHarness.setup(new RowDataSerializer((LogicalType[]) tuple2._2()));
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @TestTemplate
    public void testTwoPhaseWindowAggregateCloseWithoutOpen() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(723).append("\n                       |CREATE TABLE T2 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS\n                       | TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp())).append("',\n                       | 'failing-source' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        tEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE);
        DataStream<?> dataStream = package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  MAX(`double`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin())).toDataStream();
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarnessTesterForNoState = createHarnessTesterForNoState(dataStream, "LocalWindowAggregate");
        LogicalType[] logicalTypeArr = {DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ(3).getLogicalType(), DataTypes.TIMESTAMP_LTZ(3).getLogicalType()};
        createHarnessTesterForNoState.setup(new RowDataSerializer(logicalTypeArr));
        createHarnessTesterForNoState.close();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(dataStream, "GlobalWindowAggregate");
        createHarnessTester.setup(new RowDataSerializer(logicalTypeArr));
        createHarnessTester.close();
    }

    @TestTemplate
    public void testProcessingTimeTumbleWindowWithFutureWatermark() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator = createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), false);
        if (createProcessingTimeWindowOperator == null) {
            throw new MatchError(createProcessingTimeWindowOperator);
        }
        Tuple2 tuple2 = new Tuple2((KeyedOneInputStreamOperatorTestHarness) createProcessingTimeWindowOperator._1(), (LogicalType[]) createProcessingTimeWindowOperator._2());
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = (KeyedOneInputStreamOperatorTestHarness) tuple2._1();
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor((LogicalType[]) tuple2._2());
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(10000L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(1000L);
        keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(1.0d), "str1", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(2000L);
        keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(2.0d), "str2", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(3000L);
        keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(3.0d), "str2", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(6000L);
        keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(4.0d), "str1", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(50000L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(10000L));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToLong(2L), localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")})));
        concurrentLinkedQueue.add(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToLong(1L), localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    private Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator(String str, boolean z) {
        String stripMargin;
        String TUMBLE = WindowAggregateHarnessTest$.MODULE$.TUMBLE();
        if (TUMBLE != null ? !TUMBLE.equals(str) : str != null) {
            String HOP = WindowAggregateHarnessTest$.MODULE$.HOP();
            if (HOP != null ? !HOP.equals(str) : str != null) {
                String CUMULATE = WindowAggregateHarnessTest$.MODULE$.CUMULATE();
                if (CUMULATE != null ? !CUMULATE.equals(str) : str != null) {
                    throw new MatchError(str);
                }
                stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(180).append("\n           |CUMULATE(\n           |      TABLE ").append((Object) (z ? "T1_CDC" : "T1")).append(",\n           |      DESCRIPTOR(proctime),\n           |      INTERVAL '5' SECOND,\n           |      INTERVAL '15' SECOND)\n           |").toString())).stripMargin();
            } else {
                stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(175).append("\n           |HOP(\n           |      TABLE ").append((Object) (z ? "T1_CDC" : "T1")).append(",\n           |      DESCRIPTOR(proctime),\n           |      INTERVAL '5' SECOND,\n           |      INTERVAL '10' SECOND)\n           |").toString())).stripMargin();
            }
        } else {
            stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(138).append("\n           |TUMBLE(\n           |      TABLE ").append((Object) (z ? "T1_CDC" : "T1")).append(",\n           |      DESCRIPTOR(proctime),\n           |      INTERVAL '5' SECOND)\n           |").toString())).stripMargin();
        }
        return new Tuple2<>(createHarnessTester(package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(255).append("\n         |SELECT\n         |  `name`,\n         |  window_start,\n         |  window_end,\n         |  COUNT(*),\n         |  MAX(`double`),\n         |  COUNT(DISTINCT `string`)\n         |FROM TABLE(").append(stripMargin).append(")\n         |GROUP BY `name`, window_start, window_end\n      ").toString())).stripMargin())).toDataStream(), "WindowAggregate"), new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ(3).getLogicalType(), DataTypes.TIMESTAMP_LTZ(3).getLogicalType()});
    }

    private void ingestData(KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> keyedOneInputStreamOperatorTestHarness, boolean z) {
        if (!z) {
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(1000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(1.0d), "Hi", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(2000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(2.0d), "Comment#1", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(3000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(2.0d), "Comment#1", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(4000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(5.0d), null, null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(6000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(6.0d), "Hi", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(7000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(3.0d), "Hello", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(8000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"a", null, "Comment#2", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(16000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(4.0d), "Hi", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(32000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{null, BoxesRunTime.boxToDouble(7.0d), null, null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(34000L);
            keyedOneInputStreamOperatorTestHarness.processElement(insertRecord(Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(3.0d), "Comment#3", null})));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(50000L);
            return;
        }
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(1000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(1.0d), "Hi", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(2000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(2.0d), "Comment#1", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(3000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.DELETE, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(1.0d), "Hi", null})));
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(2.0d), "Comment#1", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(4000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(5.0d), null, null})));
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.UPDATE_BEFORE, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(2.0d), "Comment#1", null})));
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.UPDATE_AFTER, Predef$.MODULE$.genericWrapArray(new Object[]{"a", BoxesRunTime.boxToDouble(22.0d), "Comment#22", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(6000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(6.0d), "Hi", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(7000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(3.0d), "Hello", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(8000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"a", null, "Comment#2", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(16000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(4.0d), "Hi", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(38000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.INSERT, Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(8.0d), "Comment#4", null})));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(39000L);
        keyedOneInputStreamOperatorTestHarness.processElement(changelogRecord(RowKind.DELETE, Predef$.MODULE$.genericWrapArray(new Object[]{"b", BoxesRunTime.boxToDouble(8.0d), "Comment#4", null})));
    }

    private StreamRecord<RowData> insertRecord(Seq<Object> seq) {
        return StreamRecordUtils.binaryRecord(RowKind.INSERT, (Object[]) ((TraversableOnce) seq.map(obj -> {
            return obj instanceof Long ? BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(obj)) : obj instanceof Double ? BoxesRunTime.boxToDouble(BoxesRunTime.unboxToDouble(obj)) : obj;
        }, Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Object()));
    }

    private StreamRecord<RowData> changelogRecord(RowKind rowKind, Seq<Object> seq) {
        return StreamRecordUtils.binaryRecord(rowKind, (Object[]) ((TraversableOnce) seq.map(obj -> {
            return obj instanceof Long ? BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(obj)) : obj instanceof Double ? BoxesRunTime.boxToDouble(BoxesRunTime.unboxToDouble(obj)) : obj;
        }, Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Object()));
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [java.time.ZonedDateTime] */
    private TimestampData localMills(String str) {
        return TimestampData.fromEpochMillis(TimeWindowUtil.toUtcTimestampMills(LocalDateTime.parse(str).atZone(UTC_ZONE_ID()).toInstant().toEpochMilli(), this.shiftTimeZone));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public WindowAggregateHarnessTest(StreamingWithStateTestBase.StateBackendMode stateBackendMode, ZoneId zoneId) {
        super(stateBackendMode);
        this.shiftTimeZone = zoneId;
        this.UTC_ZONE_ID = ZoneId.of("UTC");
    }
}
