/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.scalar.arrow;

import java.io.IOException;
import java.util.Collection;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.operators.python.scalar.arrow.ArrowPythonScalarFunctionOperator;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class ArrowPythonScalarFunctionOperatorTest
extends PythonScalarFunctionOperatorTestBase<RowData, RowData, RowData> {
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()});

    public ArrowPythonScalarFunctionOperator getTestOperator(Configuration config, PythonFunctionInfo[] scalarFunctions, RowType inputType, RowType outputType, int[] udfInputOffsets, int[] forwardedFields) {
        RowType udfInputType = (RowType)Projection.of((int[])udfInputOffsets).project((LogicalType)inputType);
        RowType forwardedFieldType = (RowType)Projection.of((int[])forwardedFields).project((LogicalType)inputType);
        RowType udfOutputType = (RowType)Projection.range((int)forwardedFields.length, (int)outputType.getFieldCount()).project((LogicalType)outputType);
        return new PassThroughRowDataArrowPythonScalarFunctionOperator(config, scalarFunctions, inputType, udfInputType, udfOutputType, ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader()), (String)"UdfInputProjection", (RowType)inputType, (RowType)udfInputType, (int[])udfInputOffsets), ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader()), (String)"ForwardedFieldProjection", (RowType)inputType, (RowType)forwardedFieldType, (int[])forwardedFields));
    }

    @Override
    public RowData newRow(boolean accumulateMsg, Object ... fields) {
        if (accumulateMsg) {
            return StreamRecordUtils.row((Object[])fields);
        }
        RowData row = StreamRecordUtils.row((Object[])fields);
        row.setRowKind(RowKind.DELETE);
        return row;
    }

    @Override
    public void assertOutputEquals(String message, Collection<Object> expected, Collection<Object> actual) {
        this.assertor.assertOutputEquals(message, expected, actual);
    }

    @Override
    public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
        return StreamTableEnvironment.create((StreamExecutionEnvironment)env);
    }

    @Override
    public TypeSerializer<RowData> getOutputTypeSerializer(RowType rowType) {
        return new RowDataSerializer(rowType);
    }

    private static class PassThroughRowDataArrowPythonScalarFunctionOperator
    extends ArrowPythonScalarFunctionOperator {
        PassThroughRowDataArrowPythonScalarFunctionOperator(Configuration config, PythonFunctionInfo[] scalarFunctions, RowType inputType, RowType udfInputType, RowType udfOutputType, GeneratedProjection udfInputGeneratedProjection, GeneratedProjection forwardedFieldGeneratedProjection) {
            super(config, scalarFunctions, inputType, udfInputType, udfOutputType, udfInputGeneratedProjection, forwardedFieldGeneratedProjection);
        }

        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
            return new PassThroughPythonScalarFunctionRunner(this.getContainingTask().getEnvironment(), this.getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), this.udfInputType, this.udfOutputType, this.getFunctionUrn(), this.createUserDefinedFunctionsProto(), PythonTestUtils.createMockFlinkMetricContainer());
        }
    }
}

