package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch;

import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator;
import org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.class */
public class BatchArrowPythonGroupAggregateFunctionOperatorTest extends AbstractBatchArrowPythonAggregateFunctionOperatorTest {

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest$PassThroughBatchArrowPythonGroupAggregateFunctionOperator.class */
    private static class PassThroughBatchArrowPythonGroupAggregateFunctionOperator extends BatchArrowPythonGroupAggregateFunctionOperator {
        PassThroughBatchArrowPythonGroupAggregateFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int[] iArr, int[] iArr2, int[] iArr3) {
            super(configuration, pythonFunctionInfoArr, rowType, rowType2, iArr, iArr2, iArr3);
        }

        public PythonFunctionRunner createPythonFunctionRunner() {
            return new PassThroughPythonAggregateFunctionRunner(getRuntimeContext().getTaskName(), PythonTestUtils.createTestEnvironmentManager(), this.userDefinedFunctionInputType, this.userDefinedFunctionOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), new HashMap(), PythonTestUtils.createMockFlinkMetricContainer(), false);
        }
    }

    @Test
    public void testGroupAggregateFunction() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(new Configuration());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c4", 1L), 0 + 2));
        testHarness.processElement(new StreamRecord(newRow(true, "c2", "c6", 2L), 0 + 3));
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    public void testFinishBundleTriggeredByCount() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 2);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 1L), 0 + 2));
        assertOutputEquals("FinishBundle should not be triggered.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processElement(new StreamRecord(newRow(true, "c2", "c6", 2L), 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    public void testFinishBundleTriggeredByTime() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        configuration.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 1L), 0 + 2));
        testHarness.processElement(new StreamRecord(newRow(true, "c2", "c6", 2L), 0 + 2));
        assertOutputEquals("FinishBundle should not be triggered.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public LogicalType[] getOutputLogicalType() {
        return new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()};
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public RowType getInputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new VarCharType()), new RowType.RowField("f3", new BigIntType())));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public RowType getOutputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new BigIntType())));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public AbstractArrowPythonAggregateFunctionOperator getTestOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int[] iArr, int[] iArr2) {
        return new PassThroughBatchArrowPythonGroupAggregateFunctionOperator(configuration, pythonFunctionInfoArr, rowType, rowType2, iArr, iArr, iArr2);
    }
}
