/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperatorTest;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperator;
import org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;

class BatchArrowPythonGroupWindowAggregateFunctionOperatorTest
extends AbstractBatchArrowPythonAggregateFunctionOperatorTest {
    BatchArrowPythonGroupWindowAggregateFunctionOperatorTest() {
    }

    @Test
    void testGroupAggregateFunction() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(new Configuration());
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c4", 1L, 6000L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c6", 2L, 10000L), initialTime + 3L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 3L));
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L, TimestampData.fromEpochMillis((long)-5000L), TimestampData.fromEpochMillis((long)5000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L, TimestampData.fromEpochMillis((long)0L), TimestampData.fromEpochMillis((long)10000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 1L, TimestampData.fromEpochMillis((long)5000L), TimestampData.fromEpochMillis((long)15000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 2L, TimestampData.fromEpochMillis((long)10000L), TimestampData.fromEpochMillis((long)20000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 3L, TimestampData.fromEpochMillis((long)-5000L), TimestampData.fromEpochMillis((long)5000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 3L, TimestampData.fromEpochMillis((long)0L), TimestampData.fromEpochMillis((long)10000L))));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration conf = new Configuration();
        conf.set(PythonOptions.MAX_BUNDLE_SIZE, (Object)6);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c4", 1L, 6000L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c6", 2L, 10000L), initialTime + 3L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L, TimestampData.fromEpochMillis((long)-5000L), TimestampData.fromEpochMillis((long)5000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L, TimestampData.fromEpochMillis((long)0L), TimestampData.fromEpochMillis((long)10000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 1L, TimestampData.fromEpochMillis((long)5000L), TimestampData.fromEpochMillis((long)15000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 2L, TimestampData.fromEpochMillis((long)10000L), TimestampData.fromEpochMillis((long)20000L))));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 3L, TimestampData.fromEpochMillis((long)-5000L), TimestampData.fromEpochMillis((long)5000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 3L, TimestampData.fromEpochMillis((long)0L), TimestampData.fromEpochMillis((long)10000L))));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByTime() throws Exception {
        Configuration conf = new Configuration();
        conf.set(PythonOptions.MAX_BUNDLE_SIZE, (Object)10);
        conf.set(PythonOptions.MAX_BUNDLE_TIME_MILLS, (Object)1000L);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c4", 1L, 6000L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c6", 2L, 10000L), initialTime + 3L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 3L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L, TimestampData.fromEpochMillis((long)-5000L), TimestampData.fromEpochMillis((long)5000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L, TimestampData.fromEpochMillis((long)0L), TimestampData.fromEpochMillis((long)10000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 1L, TimestampData.fromEpochMillis((long)5000L), TimestampData.fromEpochMillis((long)15000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 2L, TimestampData.fromEpochMillis((long)10000L), TimestampData.fromEpochMillis((long)20000L))));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 3L, TimestampData.fromEpochMillis((long)-5000L), TimestampData.fromEpochMillis((long)5000L))));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 3L, TimestampData.fromEpochMillis((long)0L), TimestampData.fromEpochMillis((long)10000L))));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Override
    public LogicalType[] getOutputLogicalType() {
        return new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()};
    }

    @Override
    public RowType getInputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new VarCharType()), new RowType.RowField("f3", (LogicalType)new BigIntType()), new RowType.RowField("rowTime", (LogicalType)new BigIntType())));
    }

    @Override
    public RowType getOutputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new BigIntType()), new RowType.RowField("windowStart", (LogicalType)new TimestampType(3)), new RowType.RowField("windowEnd", (LogicalType)new TimestampType(3))));
    }

    @Override
    public AbstractArrowPythonAggregateFunctionOperator getTestOperator(Configuration config, PythonFunctionInfo[] pandasAggregateFunctions, RowType inputRowType, RowType outputRowType, int[] groupingSet, int[] udafInputOffsets) {
        RowType udfInputType = (RowType)Projection.of((int[])udafInputOffsets).project((LogicalType)inputRowType);
        RowType udfOutputType = (RowType)Projection.range((int)groupingSet.length, (int)(outputRowType.getFieldCount() - 2)).project((LogicalType)outputRowType);
        return new PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(config, pandasAggregateFunctions, inputRowType, udfInputType, udfOutputType, 3, 100000, 10000L, 5000L, new int[]{0, 1}, ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader()), (String)"UdafInputProjection", (RowType)inputRowType, (RowType)udfInputType, (int[])udafInputOffsets), ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader()), (String)"GroupKey", (RowType)inputRowType, (RowType)((RowType)Projection.of((int[])groupingSet).project((LogicalType)inputRowType)), (int[])groupingSet), ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader()), (String)"GroupSet", (RowType)inputRowType, (RowType)((RowType)Projection.of((int[])groupingSet).project((LogicalType)inputRowType)), (int[])groupingSet));
    }

    private static class PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator
    extends BatchArrowPythonGroupWindowAggregateFunctionOperator {
        PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(Configuration config, PythonFunctionInfo[] pandasAggFunctions, RowType inputType, RowType udfInputType, RowType udfOutputType, int inputTimeFieldIndex, int maxLimitSize, long windowSize, long slideSize, int[] namedProperties, GeneratedProjection inputGeneratedProjection, GeneratedProjection groupKeyGeneratedProjection, GeneratedProjection groupSetGeneratedProjection) {
            super(config, pandasAggFunctions, inputType, udfInputType, udfOutputType, inputTimeFieldIndex, maxLimitSize, windowSize, slideSize, namedProperties, inputGeneratedProjection, groupKeyGeneratedProjection, groupSetGeneratedProjection);
        }

        public PythonFunctionRunner createPythonFunctionRunner() {
            return new PassThroughPythonAggregateFunctionRunner(this.getContainingTask().getEnvironment(), this.getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), this.udfInputType, this.udfOutputType, this.getFunctionUrn(), this.createUserDefinedFunctionsProto(), PythonTestUtils.createMockFlinkMetricContainer(), false);
        }
    }
}

