package org.apache.flink.table.runtime.runners.python.table;

import java.util.Map;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.runners.python.AbstractPythonStatelessFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.class */
public abstract class AbstractPythonTableFunctionRunner<IN> extends AbstractPythonStatelessFunctionRunner<IN> {
    private static final String TABLE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:table_function:v1";
    private static final String TABLE_FUNCTION_URN = "flink:transform:table_function:v1";
    private final PythonFunctionInfo tableFunction;
    private transient TypeSerializer<IN> inputTypeSerializer;

    public AbstractPythonTableFunctionRunner(String str, FnDataReceiver<byte[]> fnDataReceiver, PythonFunctionInfo pythonFunctionInfo, PythonEnvironmentManager pythonEnvironmentManager, RowType rowType, RowType rowType2, Map<String, String> map, FlinkMetricContainer flinkMetricContainer) {
        super(str, fnDataReceiver, pythonEnvironmentManager, rowType, rowType2, TABLE_FUNCTION_URN, map, flinkMetricContainer);
        this.tableFunction = (PythonFunctionInfo) Preconditions.checkNotNull(pythonFunctionInfo);
    }

    @Override // org.apache.flink.python.AbstractPythonFunctionRunner, org.apache.flink.python.PythonFunctionRunner
    public void open() throws Exception {
        super.open();
        this.inputTypeSerializer = getInputTypeSerializer();
    }

    @Override // org.apache.flink.table.runtime.runners.python.AbstractPythonStatelessFunctionRunner
    @VisibleForTesting
    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedFunctions.Builder newBuilder = FlinkFnApi.UserDefinedFunctions.newBuilder();
        newBuilder.addUdfs(getUserDefinedFunctionProto(this.tableFunction));
        newBuilder.setMetricEnabled(this.flinkMetricContainer != null);
        return newBuilder.build();
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void processElement(IN in) {
        try {
            this.baos.reset();
            this.inputTypeSerializer.serialize(in, this.baosWrapper);
            this.mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(this.baos.toByteArray()));
        } catch (Throwable th) {
            throw new RuntimeException("Failed to process element when sending data to Python SDK harness.", th);
        }
    }

    @Override // org.apache.flink.python.AbstractPythonFunctionRunner
    public OutputReceiverFactory createOutputReceiverFactory() {
        return new OutputReceiverFactory() { // from class: org.apache.flink.table.runtime.runners.python.table.AbstractPythonTableFunctionRunner.1
            @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
            public FnDataReceiver<WindowedValue<byte[]>> create(String str) {
                return windowedValue -> {
                    AbstractPythonTableFunctionRunner.this.resultReceiver.accept(windowedValue.getValue());
                };
            }
        };
    }

    public abstract TypeSerializer<IN> getInputTypeSerializer();

    @Override // org.apache.flink.table.runtime.runners.python.AbstractPythonStatelessFunctionRunner
    public String getInputOutputCoderUrn() {
        return TABLE_FUNCTION_SCHEMA_CODER_URN;
    }
}
