package org.apache.flink.table.runtime.operators.python.aggregate;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.utils.PassThroughStreamTableAggregatePythonFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.class */
class PythonStreamGroupTableAggregateOperatorTest extends AbstractPythonStreamAggregateOperatorTest {

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest$PassThroughPythonStreamGroupTableAggregateOperator.class */
    private static class PassThroughPythonStreamGroupTableAggregateOperator extends PythonStreamGroupTableAggregateOperator {
        PassThroughPythonStreamGroupTableAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, int[] iArr, int i, boolean z, long j, long j2) {
            super(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, new DataViewSpec[0][0], iArr, i, z, j, j2);
        }

        public PythonFunctionRunner createPythonFunctionRunner() {
            return new PassThroughStreamTableAggregatePythonFunctionRunner(getRuntimeContext().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), this.userDefinedFunctionInputType, this.outputType, "flink:transform:stream_group_table_aggregate:v1", getUserDefinedFunctionsProto(), PythonTestUtils.createMockFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), getProcessFunction());
        }

        private Function<byte[], byte[][]> getProcessFunction() {
            return bArr -> {
                try {
                    RowData rowData = (RowData) this.udfInputTypeSerializer.deserialize(new DataInputDeserializer(bArr));
                    DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(1);
                    if (rowData.getByte(0) != 0) {
                        this.udfOutputTypeSerializer.serialize(GenericRowData.of(new Object[]{StringData.fromString("state_cleanup_triggered: " + rowData.getRow(3, getKeyType().getFieldCount()).getString(0)), Long.valueOf(rowData.getLong(2))}), dataOutputSerializer);
                        return new byte[]{dataOutputSerializer.getCopyOfBuffer()};
                    }
                    this.udfOutputTypeSerializer.serialize(rowData.getRow(1, this.inputType.getFieldCount()), dataOutputSerializer);
                    byte[] copyOfBuffer = dataOutputSerializer.getCopyOfBuffer();
                    return new byte[]{copyOfBuffer, copyOfBuffer};
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            };
        }
    }

    PythonStreamGroupTableAggregateOperatorTest() {
    }

    @Test
    void testFlushDataOnClose() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(new Configuration());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(false, "c2", 1L), 0 + 2));
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c2", 1L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c2", 1L), 0 + 2));
        testHarness.processElement(new StreamRecord(newRow(true, "c3", 2L), 0 + 3));
        testHarness.prepareSnapshotPreBarrier(0L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c3", 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c3", 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 3);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c2", 1L), 0 + 2));
        assertOutputEquals("FinishBundle should not be triggered.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processElement(new StreamRecord(newRow(true, "c3", 2L), 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c3", 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c3", 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testFinishBundleTriggeredByTime() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        configuration.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c2", 1L), 0 + 2));
        testHarness.processElement(new StreamRecord(newRow(true, "c3", 2L), 0 + 3));
        assertOutputEquals("FinishBundle should not be triggered.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c3", 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c3", 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testWatermarkProcessedOnFinishBundle() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(true, "c2", 1L), 0 + 2));
        testHarness.processWatermark(0 + 2);
        assertOutputEquals("Watermark has been processed", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testStateCleanupTimer() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("table.exec.state.ttl", "100");
        OneInputStreamOperatorTestHarness testHarness = getTestHarness(configuration);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(new StreamRecord(newRow(true, "c1", 0L), 0 + 1));
        testHarness.setProcessingTime(500L);
        testHarness.processElement(new StreamRecord(newRow(true, "c2", 1L), 0 + 2));
        testHarness.setProcessingTime(599L);
        testHarness.processElement(new StreamRecord(newRow(true, "c2", 2L), 0 + 3));
        testHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "state_cleanup_triggered: c1", 100L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c2", 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "state_cleanup_triggered: c2", 699L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest
    public OneInputStreamOperator getTestOperator(Configuration configuration) {
        long longValue = Long.valueOf(configuration.getString("table.exec.state.ttl", "0")).longValue();
        return new PassThroughPythonStreamGroupTableAggregateOperator(configuration, getInputType(), getOutputType(), new PythonAggregateFunctionInfo[]{new PythonAggregateFunctionInfo(PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE, new Integer[]{0}, -1, false)}, getGrouping(), -1, false, longValue, longValue);
    }
}
