package org.apache.flink.table.runtime.operators.python;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.runners.python.BaseRowPythonScalarFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/BaseRowPythonScalarFunctionOperator.class */
public class BaseRowPythonScalarFunctionOperator extends AbstractPythonScalarFunctionOperator<BaseRow, BaseRow, BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    private transient StreamRecordBaseRowWrappingCollector baseRowWrapper;
    private transient JoinedRow reuseJoinedRow;
    private transient Projection<BaseRow, BinaryRow> forwardedFieldProjection;
    private transient Projection<BaseRow, BinaryRow> udfInputProjection;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/BaseRowPythonScalarFunctionOperator$StreamRecordBaseRowWrappingCollector.class */
    private static class StreamRecordBaseRowWrappingCollector implements Collector<BaseRow> {
        private final Collector<StreamRecord<BaseRow>> out;
        private final StreamRecord<BaseRow> reuseStreamRecord = new StreamRecord<>((Object) null);

        StreamRecordBaseRowWrappingCollector(Collector<StreamRecord<BaseRow>> collector) {
            this.out = collector;
        }

        public void collect(BaseRow baseRow) {
            this.out.collect(this.reuseStreamRecord.replace(baseRow));
        }

        public void close() {
            this.out.close();
        }
    }

    public BaseRowPythonScalarFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int[] iArr, int[] iArr2) {
        super(configuration, pythonFunctionInfoArr, rowType, rowType2, iArr, iArr2);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.baseRowWrapper = new StreamRecordBaseRowWrappingCollector(this.output);
        this.reuseJoinedRow = new JoinedRow();
        this.udfInputProjection = createUdfInputProjection();
        this.forwardedFieldProjection = createForwardedFieldProjection();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator
    public void bufferInput(BaseRow baseRow) {
        BinaryRow copy = this.forwardedFieldProjection.apply(baseRow).copy();
        copy.setHeader(baseRow.getHeader());
        this.forwardedInputQueue.add(copy);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator
    public BaseRow getUdfInput(BaseRow baseRow) {
        return this.udfInputProjection.apply(baseRow);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResults() {
        while (true) {
            BaseRow baseRow = (BaseRow) this.udfResultQueue.poll();
            if (baseRow == null) {
                return;
            }
            BaseRow baseRow2 = (BaseRow) this.forwardedInputQueue.poll();
            this.reuseJoinedRow.setHeader(baseRow2.getHeader());
            this.baseRowWrapper.collect((BaseRow) this.reuseJoinedRow.replace(baseRow2, baseRow));
        }
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator
    public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(FnDataReceiver<BaseRow> fnDataReceiver, PythonEnvironmentManager pythonEnvironmentManager) {
        return new BaseRowPythonScalarFunctionRunner(getRuntimeContext().getTaskName(), fnDataReceiver, this.scalarFunctions, pythonEnvironmentManager, this.udfInputType, this.udfOutputType);
    }

    private Projection<BaseRow, BinaryRow> createUdfInputProjection() {
        return (Projection) ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(new TableConfig()), "UdfInputProjection", this.inputType, this.udfInputType, this.udfInputOffsets).newInstance(Thread.currentThread().getContextClassLoader());
    }

    private Projection<BaseRow, BinaryRow> createForwardedFieldProjection() {
        return (Projection) ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(new TableConfig()), "ForwardedFieldProjection", this.inputType, new RowType((List) Arrays.stream(this.forwardedFields).mapToObj(i -> {
            return (RowType.RowField) this.inputType.getFields().get(i);
        }).collect(Collectors.toList())), this.forwardedFields).newInstance(Thread.currentThread().getContextClassLoader());
    }
}
