package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
import org.apache.flink.table.runtime.groupwindow.WindowReference;
import org.apache.flink.table.runtime.groupwindow.WindowStart;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.utils.PassThroughPythonAggregateFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.class */
class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest extends AbstractStreamArrowPythonAggregateFunctionOperatorTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest$PassThroughStreamArrowPythonGroupWindowAggregateFunctionOperator.class */
    private static class PassThroughStreamArrowPythonGroupWindowAggregateFunctionOperator extends StreamArrowPythonGroupWindowAggregateFunctionOperator {
        PassThroughStreamArrowPythonGroupWindowAggregateFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, RowType rowType3, int i, WindowAssigner windowAssigner, Trigger trigger, long j, NamedWindowProperty[] namedWindowPropertyArr, ZoneId zoneId, GeneratedProjection generatedProjection) {
            super(configuration, pythonFunctionInfoArr, rowType, rowType2, rowType3, i, windowAssigner, trigger, j, namedWindowPropertyArr, zoneId, generatedProjection);
        }

        public PythonFunctionRunner createPythonFunctionRunner() {
            return new PassThroughPythonAggregateFunctionRunner(getRuntimeContext().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), this.udfInputType, this.udfOutputType, getFunctionUrn(), createUserDefinedFunctionsProto(), PythonTestUtils.createMockFlinkMetricContainer(), false);
        }
    }

    StreamArrowPythonGroupWindowAggregateFunctionOperatorTest() {
    }

    @Test
    void testGroupWindowAggregateFunction() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(new Configuration());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c2", 0L, 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c4", 1L, 6000L), 0 + 2));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c6", 2L, 10000L), 0 + 3));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c2", "c8", 3L, 0L), 0 + 4));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c3", "c8", 3L, 0L), 0 + 5));
        testHarness.processElement(new StreamRecord(newBinaryRow(false, "c3", "c8", 3L, 0L), 0 + 6));
        testHarness.processWatermark(Long.MAX_VALUE);
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L, TimestampData.fromEpochMillis(-5000L), TimestampData.fromEpochMillis(5000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 3L, TimestampData.fromEpochMillis(-5000L), TimestampData.fromEpochMillis(5000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L, TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 3L, TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 1L, TimestampData.fromEpochMillis(5000L), TimestampData.fromEpochMillis(15000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 2L, TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L))));
        concurrentLinkedQueue.add(new Watermark(Long.MAX_VALUE));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c2", 0L, 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c4", 1L, 6000L), 0 + 2));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c6", 2L, 10000L), 0 + 3));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c2", "c8", 3L, 0L), 0 + 4));
        testHarness.processWatermark(new Watermark(10000L));
        testHarness.prepareSnapshotPreBarrier(0L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L, TimestampData.fromEpochMillis(-5000L), TimestampData.fromEpochMillis(5000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 3L, TimestampData.fromEpochMillis(-5000L), TimestampData.fromEpochMillis(5000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 3L, TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L, TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L))));
        concurrentLinkedQueue.add(new Watermark(10000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processWatermark(20000L);
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 1L, TimestampData.fromEpochMillis(5000L), TimestampData.fromEpochMillis(15000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 2L, TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L))));
        concurrentLinkedQueue.add(new Watermark(20000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 4);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c2", 0L, 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c4", 1L, 6000L), 0 + 2));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c1", "c6", 2L, 10000L), 0 + 3));
        testHarness.processElement(new StreamRecord(newBinaryRow(true, "c2", "c8", 3L, 0L), 0 + 4));
        testHarness.processWatermark(new Watermark(10000L));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L, TimestampData.fromEpochMillis(-5000L), TimestampData.fromEpochMillis(5000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 3L, TimestampData.fromEpochMillis(-5000L), TimestampData.fromEpochMillis(5000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 3L, TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L, TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L))));
        concurrentLinkedQueue.add(new Watermark(10000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processWatermark(20000L);
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 1L, TimestampData.fromEpochMillis(5000L), TimestampData.fromEpochMillis(15000L))));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 2L, TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L))));
        concurrentLinkedQueue.add(new Watermark(20000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public LogicalType[] getOutputLogicalType() {
        return new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()};
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public RowType getInputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new VarCharType()), new RowType.RowField("f3", new BigIntType()), new RowType.RowField("rowTime", new BigIntType())));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public RowType getOutputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new BigIntType()), new RowType.RowField("windowStart", new TimestampType(3)), new RowType.RowField("windowEnd", new TimestampType(3))));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase
    public AbstractArrowPythonAggregateFunctionOperator getTestOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int[] iArr, int[] iArr2) {
        SlidingWindowAssigner withEventTime = SlidingWindowAssigner.of(Duration.ofMillis(10000L), Duration.ofMillis(5000L)).withEventTime();
        EventTimeTriggers.AfterEndOfWindow afterEndOfWindow = EventTimeTriggers.afterEndOfWindow();
        RowType project = Projection.of(iArr2).project(rowType);
        return new PassThroughStreamArrowPythonGroupWindowAggregateFunctionOperator(configuration, pythonFunctionInfoArr, rowType, project, Projection.range(iArr.length, rowType2.getFieldCount() - 2).project(rowType2), 3, withEventTime, afterEndOfWindow, 0L, new NamedWindowProperty[]{new NamedWindowProperty("start", new WindowStart((WindowReference) null)), new NamedWindowProperty("end", new WindowEnd((WindowReference) null))}, UTC_ZONE_ID, ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(new Configuration(), Thread.currentThread().getContextClassLoader()), "UdafInputProjection", rowType, project, iArr2));
    }
}
