/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperatorTest;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonOverWindowAggregateFunctionOperator;
import org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Assert;
import org.junit.Test;

public class BatchArrowPythonOverWindowAggregateFunctionOperatorTest
extends AbstractBatchArrowPythonAggregateFunctionOperatorTest {
    @Test
    public void testOverWindowAggregateFunction() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(new Configuration());
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c4", 1L, 0L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 3L));
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c2", 0L, 0L, 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c4", 1L, 0L, 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c6", 2L, 10L, 2L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", "c8", 3L, 0L, 3L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testFinishBundleTriggeredByCount() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 3);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c4", 1L, 0L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c2", 0L, 0L, 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c4", 1L, 0L, 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c6", 2L, 10L, 2L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", "c8", 3L, 0L, 3L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testFinishBundleTriggeredByTime() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c4", 1L, 0L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3L));
        testHarness.processElement(new StreamRecord((Object)this.newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 3L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c2", 0L, 0L, 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c4", 1L, 0L, 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", "c6", 2L, 10L, 2L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", "c8", 3L, 0L, 3L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testUserDefinedFunctionsProto() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.getTestHarness(new Configuration());
        testHarness.open();
        BatchArrowPythonOverWindowAggregateFunctionOperator operator = (BatchArrowPythonOverWindowAggregateFunctionOperator)testHarness.getOneInputOperator();
        FlinkFnApi.UserDefinedFunctions functionsProto = operator.getUserDefinedFunctionsProto();
        List windows = functionsProto.getWindowsList();
        Assert.assertEquals((long)2L, (long)windows.size());
        FlinkFnApi.OverWindow firstWindow = (FlinkFnApi.OverWindow)windows.get(0);
        Assert.assertEquals((Object)firstWindow.getWindowType(), (Object)FlinkFnApi.OverWindow.WindowType.RANGE_SLIDING);
        FlinkFnApi.OverWindow secondWindow = (FlinkFnApi.OverWindow)windows.get(1);
        Assert.assertEquals((Object)secondWindow.getWindowType(), (Object)FlinkFnApi.OverWindow.WindowType.ROW_UNBOUNDED_PRECEDING);
        Assert.assertEquals((long)secondWindow.getUpperBoundary(), (long)2L);
    }

    @Override
    public LogicalType[] getOutputLogicalType() {
        return new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()};
    }

    @Override
    public RowType getInputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new VarCharType()), new RowType.RowField("f3", (LogicalType)new BigIntType()), new RowType.RowField("rowTime", (LogicalType)new BigIntType())));
    }

    @Override
    public RowType getOutputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new VarCharType()), new RowType.RowField("f3", (LogicalType)new BigIntType()), new RowType.RowField("rowTime", (LogicalType)new BigIntType()), new RowType.RowField("agg", (LogicalType)new BigIntType())));
    }

    @Override
    public AbstractArrowPythonAggregateFunctionOperator getTestOperator(Configuration config, PythonFunctionInfo[] pandasAggregateFunctions, RowType inputType, RowType outputType, int[] groupingSet, int[] udafInputOffsets) {
        return new PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(config, pandasAggregateFunctions, inputType, outputType, new long[]{0L, Long.MIN_VALUE}, new long[]{0L, 2L}, new boolean[]{true, false}, new int[]{0}, groupingSet, groupingSet, udafInputOffsets, 3, true);
    }

    private static class PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator
    extends BatchArrowPythonOverWindowAggregateFunctionOperator {
        PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(Configuration config, PythonFunctionInfo[] pandasAggFunctions, RowType inputType, RowType outputType, long[] lowerBoundary, long[] upperBoundary, boolean[] isRangeWindow, int[] aggWindowIndex, int[] groupKey, int[] groupingSet, int[] udafInputOffsets, int inputTimeFieldIndex, boolean asc) {
            super(config, pandasAggFunctions, inputType, outputType, lowerBoundary, upperBoundary, isRangeWindow, aggWindowIndex, groupKey, groupingSet, udafInputOffsets, inputTimeFieldIndex, asc);
        }

        public PythonFunctionRunner createPythonFunctionRunner() {
            return new PassThroughPythonAggregateFunctionRunner(this.getRuntimeContext().getTaskName(), PythonTestUtils.createTestEnvironmentManager(), this.userDefinedFunctionInputType, this.userDefinedFunctionOutputType, this.getFunctionUrn(), this.getUserDefinedFunctionsProto(), this.getInputOutputCoderUrn(), new HashMap<String, String>(), PythonTestUtils.createMockFlinkMetricContainer(), true);
        }
    }
}

