package org.apache.flink.table.runtime.operators.python.table;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTestBase.class */
abstract class PythonTableFunctionOperatorTestBase<IN, OUT> {
    @Test
    void testRetractionFieldKept() throws Exception {
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = getTestHarness(new Configuration(), FlinkJoinType.INNER);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(false, "c3", "c4", 1L), 0 + 2));
        testHarness.processElement(new StreamRecord(newRow(false, "c5", "c6", 2L), 0 + 3));
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", "c2", 0L, 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c3", "c4", 1L, 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c5", "c6", 2L, 2L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = getTestHarness(configuration, FlinkJoinType.INNER);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        testHarness.prepareSnapshotPreBarrier(0L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", "c2", 0L, 0L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 2);
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = getTestHarness(configuration, FlinkJoinType.INNER);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        assertOutputEquals("FinishBundle should not be triggered.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 1L), 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", "c2", 0L, 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", "c2", 1L, 1L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testFinishBundleTriggeredByTime() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        configuration.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = getTestHarness(configuration, FlinkJoinType.INNER);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        assertOutputEquals("FinishBundle should not be triggered.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", "c2", 0L, 0L)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testLeftJoin() throws Exception {
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = getTestHarness(new Configuration(), FlinkJoinType.LEFT);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        testHarness.open();
        testHarness.processElement(new StreamRecord(newRow(true, "c1", "c2", 0L), 0 + 1));
        testHarness.processElement(new StreamRecord(newRow(false, "c2", "c4", 1L), 0 + 2));
        testHarness.processElement(new StreamRecord(newRow(false, "c3", "c6", 2L), 0 + 3));
        testHarness.processElement(new StreamRecord(newRow(false, "c4", "c6", 2L), 0 + 4));
        testHarness.processElement(new StreamRecord(newRow(false, "c5", "c6", 2L), 0 + 5));
        testHarness.processElement(new StreamRecord(newRow(false, "c6", "c6", 2L), 0 + 6));
        testHarness.processElement(new StreamRecord(newRow(false, "c7", "c6", 2L), 0 + 7));
        testHarness.processElement(new StreamRecord(newRow(false, "c8", "c6", 2L), 0 + 8));
        testHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(newRow(true, "c1", "c2", 0L, 0L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c2", "c4", 1L, 1L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c3", "c6", 2L, 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c4", "c6", 2L, 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c5", "c6", 2L, 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c6", "c6", 2L, null)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c7", "c6", 2L, 2L)));
        concurrentLinkedQueue.add(new StreamRecord(newRow(false, "c8", "c6", 2L, null)));
        assertOutputEquals("Output was not correct.", concurrentLinkedQueue, testHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<IN, OUT> getTestHarness(Configuration configuration, FlinkJoinType flinkJoinType) throws Exception {
        OneInputStreamOperatorTestHarness<IN, OUT> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(getTestOperator(configuration, new PythonFunctionInfo(PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE, new Integer[]{0}), new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new VarCharType()), new RowType.RowField("f3", new BigIntType()))), new RowType(Arrays.asList(new RowType.RowField("f1", new VarCharType()), new RowType.RowField("f2", new VarCharType()), new RowType.RowField("f3", new BigIntType()), new RowType.RowField("f4", new BigIntType()))), new int[]{2}, flinkJoinType));
        oneInputStreamOperatorTestHarness.getStreamConfig().setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.PYTHON, 0.5d);
        return oneInputStreamOperatorTestHarness;
    }

    public abstract IN newRow(boolean z, Object... objArr);

    public abstract void assertOutputEquals(String str, Collection<Object> collection, Collection<Object> collection2);

    public abstract PythonTableFunctionOperator getTestOperator(Configuration configuration, PythonFunctionInfo pythonFunctionInfo, RowType rowType, RowType rowType2, int[] iArr, FlinkJoinType flinkJoinType);
}
