package org.apache.flink.table.runtime.operators.python.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.class */
public class PythonTableFunctionOperator extends AbstractStatelessFunctionOperator<RowData, RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final String TABLE_FUNCTION_URN = "flink:transform:table_function:v1";
    private final PythonFunctionInfo tableFunction;
    private final FlinkJoinType joinType;
    private final GeneratedProjection udtfInputGeneratedProjection;
    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
    private transient JoinedRowData reuseJoinedRow;
    private transient Projection<RowData, BinaryRowData> udtfInputProjection;
    private transient TypeSerializer<RowData> udtfOutputTypeSerializer;
    private transient TypeSerializer<RowData> udtfInputTypeSerializer;
    private transient RowDataSerializer forwardedInputSerializer;
    private transient RowData input;
    private transient boolean hasJoined;
    private transient boolean isFinishResult;

    public PythonTableFunctionOperator(Configuration configuration, PythonFunctionInfo pythonFunctionInfo, RowType rowType, RowType rowType2, RowType rowType3, FlinkJoinType flinkJoinType, GeneratedProjection generatedProjection) {
        super(configuration, rowType, rowType2, rowType3);
        this.tableFunction = (PythonFunctionInfo) Preconditions.checkNotNull(pythonFunctionInfo);
        Preconditions.checkArgument(flinkJoinType == FlinkJoinType.INNER || flinkJoinType == FlinkJoinType.LEFT, "The join type should be inner join or left join");
        this.joinType = flinkJoinType;
        this.udtfInputGeneratedProjection = (GeneratedProjection) Preconditions.checkNotNull(generatedProjection);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        this.reuseJoinedRow = new JoinedRowData();
        this.udtfInputProjection = (Projection) this.udtfInputGeneratedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        this.forwardedInputSerializer = new RowDataSerializer(this.inputType);
        this.udtfInputTypeSerializer = PythonTypeUtils.toInternalSerializer(this.udfInputType);
        this.udtfOutputTypeSerializer = PythonTypeUtils.toInternalSerializer(this.udfOutputType);
        this.input = null;
        this.hasJoined = false;
        this.isFinishResult = true;
    }

    @Override // org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.tableFunction.getPythonFunction().getPythonEnv();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public String getFunctionUrn() {
        return TABLE_FUNCTION_URN;
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType rowType) {
        return this.tableFunction.getPythonFunction().takesRowAsInput() ? ProtoUtils.createRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, true) : ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, true);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, true);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
        return ProtoUtils.createUserDefinedFunctionsProto(getRuntimeContext(), new PythonFunctionInfo[]{this.tableFunction}, ((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue(), ((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue());
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void bufferInput(RowData rowData) {
        RowData copy = this.forwardedInputSerializer.copy(rowData);
        copy.setRowKind(rowData.getRowKind());
        this.forwardedInputQueue.add(copy);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public RowData getFunctionInput(RowData rowData) {
        return this.udtfInputProjection.apply(rowData);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void processElementInternal(RowData rowData) throws Exception {
        this.udtfInputTypeSerializer.serialize(getFunctionInput(rowData), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
    }

    @Override // org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator
    public void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception {
        if (this.isFinishResult) {
            this.input = (RowData) this.forwardedInputQueue.poll();
            this.hasJoined = false;
        }
        do {
            byte[] bArr = (byte[]) tuple3.f1;
            this.isFinishResult = isFinishResult(bArr, ((Integer) tuple3.f2).intValue());
            if (!this.isFinishResult) {
                this.reuseJoinedRow.setRowKind(this.input.getRowKind());
                this.bais.setBuffer(bArr, 0, bArr.length);
                this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(this.input, (RowData) this.udtfOutputTypeSerializer.deserialize(this.baisWrapper)));
                tuple3 = this.pythonFunctionRunner.pollResult();
                this.hasJoined = true;
            } else if (this.joinType == FlinkJoinType.LEFT && !this.hasJoined) {
                GenericRowData genericRowData = new GenericRowData(this.udfOutputType.getFieldCount());
                for (int i = 0; i < genericRowData.getArity(); i++) {
                    genericRowData.setField(i, (Object) null);
                }
                this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(this.input, genericRowData));
            }
            if (this.isFinishResult) {
                return;
            }
        } while (tuple3 != null);
    }

    private boolean isFinishResult(byte[] bArr, int i) {
        return i == 1 && bArr[0] == 0;
    }
}
