/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.table;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.operators.python.table.AbstractPythonTableFunctionOperator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;

public abstract class PythonTableFunctionOperatorTestBase<IN, OUT, UDTFIN> {
    @Test
    public void testRetractionFieldKept() throws Exception {
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = this.getTestHarness(new Configuration(), JoinRelType.INNER);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord(this.newRow(true, "c1", "c2", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c3", "c4", 1L), initialTime + 2L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c5", "c6", 2L), initialTime + 3L));
        testHarness.close();
        expectedOutput.add(new StreamRecord(this.newRow(true, "c1", "c2", 0L, 0L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c3", "c4", 1L, 1L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c5", "c6", 2L, 2L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = this.getTestHarness(conf, JoinRelType.INNER);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord(this.newRow(true, "c1", "c2", 0L), initialTime + 1L));
        testHarness.prepareSnapshotPreBarrier(0L);
        expectedOutput.add(new StreamRecord(this.newRow(true, "c1", "c2", 0L, 0L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testFinishBundleTriggeredByCount() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 2);
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = this.getTestHarness(conf, JoinRelType.INNER);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord(this.newRow(true, "c1", "c2", 0L), initialTime + 1L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord(this.newRow(true, "c1", "c2", 1L), initialTime + 2L));
        expectedOutput.add(new StreamRecord(this.newRow(true, "c1", "c2", 0L, 0L)));
        expectedOutput.add(new StreamRecord(this.newRow(true, "c1", "c2", 1L, 1L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testFinishBundleTriggeredByTime() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = this.getTestHarness(conf, JoinRelType.INNER);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord(this.newRow(true, "c1", "c2", 0L), initialTime + 1L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(new StreamRecord(this.newRow(true, "c1", "c2", 0L, 0L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testLeftJoin() throws Exception {
        OneInputStreamOperatorTestHarness<IN, OUT> testHarness = this.getTestHarness(new Configuration(), JoinRelType.LEFT);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord(this.newRow(true, "c1", "c2", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c2", "c4", 1L), initialTime + 2L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c3", "c6", 2L), initialTime + 3L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c4", "c6", 2L), initialTime + 4L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c5", "c6", 2L), initialTime + 5L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c6", "c6", 2L), initialTime + 6L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c7", "c6", 2L), initialTime + 7L));
        testHarness.processElement(new StreamRecord(this.newRow(false, "c8", "c6", 2L), initialTime + 8L));
        testHarness.close();
        expectedOutput.add(new StreamRecord(this.newRow(true, "c1", "c2", 0L, 0L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c2", "c4", 1L, 1L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c3", "c6", 2L, 2L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c4", "c6", 2L, 2L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c5", "c6", 2L, 2L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c6", "c6", 2L, null)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c7", "c6", 2L, 2L)));
        expectedOutput.add(new StreamRecord(this.newRow(false, "c8", "c6", 2L, null)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<IN, OUT> getTestHarness(Configuration config, JoinRelType joinRelType) throws Exception {
        RowType inputType = new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new VarCharType()), new RowType.RowField("f3", (LogicalType)new BigIntType())));
        RowType outputType = new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new VarCharType()), new RowType.RowField("f3", (LogicalType)new BigIntType()), new RowType.RowField("f4", (LogicalType)new BigIntType())));
        AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN> operator = this.getTestOperator(config, new PythonFunctionInfo(PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE, (Object[])new Integer[]{0}), inputType, outputType, new int[]{2}, joinRelType);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.getStreamConfig().setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.PYTHON, 0.5);
        return testHarness;
    }

    public abstract IN newRow(boolean var1, Object ... var2);

    public abstract void assertOutputEquals(String var1, Collection<Object> var2, Collection<Object> var3);

    public abstract AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN> getTestOperator(Configuration var1, PythonFunctionInfo var2, RowType var3, RowType var4, int[] var5, JoinRelType var6);
}

