/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.scalar.arrow;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.operators.python.scalar.arrow.RowDataArrowPythonScalarFunctionOperator;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class RowDataArrowPythonScalarFunctionOperatorTest
extends PythonScalarFunctionOperatorTestBase<RowData, RowData, RowData> {
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()});

    @Override
    public AbstractPythonScalarFunctionOperator<RowData, RowData, RowData> getTestOperator(Configuration config, PythonFunctionInfo[] scalarFunctions, RowType inputType, RowType outputType, int[] udfInputOffsets, int[] forwardedFields) {
        return new PassThroughRowDataArrowPythonScalarFunctionOperator(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
    }

    @Override
    public RowData newRow(boolean accumulateMsg, Object ... fields) {
        if (accumulateMsg) {
            return StreamRecordUtils.row((Object[])fields);
        }
        RowData row = StreamRecordUtils.row((Object[])fields);
        row.setRowKind(RowKind.DELETE);
        return row;
    }

    @Override
    public void assertOutputEquals(String message, Collection<Object> expected, Collection<Object> actual) {
        this.assertor.assertOutputEquals(message, expected, actual);
    }

    @Override
    public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
        return StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build());
    }

    @Override
    public TypeSerializer<RowData> getOutputTypeSerializer(RowType rowType) {
        return new RowDataSerializer(rowType);
    }

    private static class PassThroughRowDataArrowPythonScalarFunctionOperator
    extends RowDataArrowPythonScalarFunctionOperator {
        PassThroughRowDataArrowPythonScalarFunctionOperator(Configuration config, PythonFunctionInfo[] scalarFunctions, RowType inputType, RowType outputType, int[] udfInputOffsets, int[] forwardedFields) {
            super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
        }

        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
            return new PassThroughPythonScalarFunctionRunner(this.getRuntimeContext().getTaskName(), PythonTestUtils.createTestEnvironmentManager(), this.userDefinedFunctionInputType, this.userDefinedFunctionOutputType, this.getFunctionUrn(), this.getUserDefinedFunctionsProto(), this.getInputOutputCoderUrn(), new HashMap<String, String>(), PythonTestUtils.createMockFlinkMetricContainer());
        }
    }
}

