package org.apache.flink.table.runtime.operators.python.aggregate;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
import org.apache.flink.table.runtime.groupwindow.WindowReference;
import org.apache.flink.table.runtime.groupwindow.WindowStart;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.class */
class PythonStreamGroupWindowAggregateOperatorTest extends AbstractPythonStreamAggregateOperatorTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");

    PythonStreamGroupWindowAggregateOperatorTest() {
    }

    @Test
    void testGroupWindowAggregateFunction() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(new Configuration());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(newRecord(true, 0 + 1, "c1", "c2", 0L, 0L));
        testHarness.processElement(newRecord(true, 0 + 2, "c1", "c4", 1L, 6000L));
        testHarness.processElement(newRecord(true, 0 + 3, "c1", "c6", 2L, 10000L));
        testHarness.processElement(newRecord(true, 0 + 4, "c2", "c8", 3L, 0L));
        testHarness.processElement(newRecord(true, 0 + 5, "c3", "c8", 3L, 0L));
        testHarness.processElement(newRecord(false, 0 + 6, "c3", "c8", 3L, 0L));
        testHarness.processWatermark(Long.MAX_VALUE);
        testHarness.close();
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c1"));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c3"));
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c2"));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c3"));
        concurrentLinkedQueue.add(newWindowRecord(5000L, 15000L, "c1", 1L));
        concurrentLinkedQueue.add(newStateCleanupRecord(5000L, 15000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(10000L, 20000L, "c1", 2L));
        concurrentLinkedQueue.add(newStateCleanupRecord(10000L, 20000L, "c1"));
        concurrentLinkedQueue.add(new Watermark(Long.MAX_VALUE));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(newRecord(true, 0 + 1, "c1", "c2", 0L, 0L));
        testHarness.processElement(newRecord(true, 0 + 2, "c1", "c4", 1L, 6000L));
        testHarness.processElement(newRecord(true, 0 + 3, "c1", "c6", 2L, 10000L));
        testHarness.processElement(newRecord(true, 0 + 4, "c2", "c8", 3L, 0L));
        testHarness.processWatermark(new Watermark(10000L));
        testHarness.prepareSnapshotPreBarrier(0L);
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c1"));
        concurrentLinkedQueue.add(new Watermark(10000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processWatermark(20000L);
        testHarness.close();
        concurrentLinkedQueue.add(newWindowRecord(5000L, 15000L, "c1", 1L));
        concurrentLinkedQueue.add(newStateCleanupRecord(5000L, 15000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(10000L, 20000L, "c1", 2L));
        concurrentLinkedQueue.add(newStateCleanupRecord(10000L, 20000L, "c1"));
        concurrentLinkedQueue.add(new Watermark(20000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(PythonOptions.MAX_BUNDLE_SIZE, 4);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(newRecord(true, 0 + 1, "c1", "c2", 0L, 0L));
        testHarness.processElement(newRecord(true, 0 + 2, "c1", "c4", 1L, 6000L));
        testHarness.processElement(newRecord(true, 0 + 3, "c1", "c6", 2L, 10000L));
        testHarness.processElement(newRecord(true, 0 + 4, "c2", "c8", 3L, 0L));
        testHarness.processWatermark(new Watermark(10000L));
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c1"));
        concurrentLinkedQueue.add(new Watermark(10000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processWatermark(20000L);
        testHarness.close();
        concurrentLinkedQueue.add(newWindowRecord(5000L, 15000L, "c1", 1L));
        concurrentLinkedQueue.add(newStateCleanupRecord(5000L, 15000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(10000L, 20000L, "c1", 2L));
        concurrentLinkedQueue.add(newStateCleanupRecord(10000L, 20000L, "c1"));
        concurrentLinkedQueue.add(new Watermark(20000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByTime() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(PythonOptions.MAX_BUNDLE_SIZE, 10);
        configuration.set(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(newRecord(true, 0 + 1, "c1", "c2", 0L, 0L));
        testHarness.processElement(newRecord(true, 0 + 2, "c1", "c4", 1L, 6000L));
        testHarness.processElement(newRecord(true, 0 + 3, "c1", "c6", 2L, 10000L));
        testHarness.processElement(newRecord(true, 0 + 4, "c2", "c8", 3L, 0L));
        testHarness.processWatermark(new Watermark(20000L));
        testHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(-5000L, 5000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(-5000L, 5000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c2", 3L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c2"));
        concurrentLinkedQueue.add(newWindowRecord(0L, 10000L, "c1", 0L));
        concurrentLinkedQueue.add(newStateCleanupRecord(0L, 10000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(5000L, 15000L, "c1", 1L));
        concurrentLinkedQueue.add(newStateCleanupRecord(5000L, 15000L, "c1"));
        concurrentLinkedQueue.add(newWindowRecord(10000L, 20000L, "c1", 2L));
        concurrentLinkedQueue.add(newStateCleanupRecord(10000L, 20000L, "c1"));
        concurrentLinkedQueue.add(new Watermark(20000L));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    private StreamRecord<RowData> newRecord(boolean z, long j, Object... objArr) {
        return new StreamRecord<>(newRow(z, objArr), j);
    }

    private StreamRecord<RowData> newWindowRecord(long j, long j2, Object... objArr) {
        Object[] objArr2 = new Object[objArr.length + 2];
        System.arraycopy(objArr, 0, objArr2, 0, objArr.length);
        objArr2[objArr2.length - 2] = TimestampData.fromEpochMillis(j);
        objArr2[objArr2.length - 1] = TimestampData.fromEpochMillis(j2);
        return new StreamRecord<>(newRow(true, objArr2));
    }

    private StreamRecord newStateCleanupRecord(long j, long j2, Object obj) {
        return new StreamRecord(newRow(true, String.format("state_cleanup_triggered: %s : TimeWindow{start=%s, end=%s}", obj, Long.valueOf(j), Long.valueOf(j2)), 0L, TimestampData.fromEpochMillis(j), TimestampData.fromEpochMillis(j2)));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest
    public LogicalType[] getOutputLogicalType() {
        return new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()};
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest
    public RowType getInputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new VarCharType()), new RowType.RowField("f3", new BigIntType()), new RowType.RowField("rowTime", new BigIntType())));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest
    public RowType getOutputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new BigIntType()), new RowType.RowField("windowStart", new TimestampType(3)), new RowType.RowField("windowEnd", new TimestampType(3))));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest
    OneInputStreamOperator getTestOperator(Configuration configuration) {
        SlidingWindowAssigner withEventTime = SlidingWindowAssigner.of(Duration.ofMillis(10000L), Duration.ofMillis(5000L)).withEventTime();
        new WindowReference("w$", new TimestampType(3));
        return new PassThroughPythonStreamGroupWindowAggregateOperator(configuration, getInputType(), getOutputType(), new PythonAggregateFunctionInfo[]{new PythonAggregateFunctionInfo(PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE, new Integer[]{2}, -1, false)}, getGrouping(), -1, false, false, 3, withEventTime, FlinkFnApi.GroupWindow.WindowType.SLIDING_GROUP_WINDOW, true, true, 10000L, 5000L, 0L, 0L, new NamedWindowProperty[]{new NamedWindowProperty("start", new WindowStart((WindowReference) null)), new NamedWindowProperty("end", new WindowEnd((WindowReference) null))}, UTC_ZONE_ID);
    }
}
