package org.apache.flink.table.runtime.operators.python.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.class */
public class RowDataPythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<RowData, RowData, RowData> {
    private static final long serialVersionUID = 1;
    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
    private transient JoinedRowData reuseJoinedRow;
    private transient Projection<RowData, BinaryRowData> udtfInputProjection;
    private transient TypeSerializer<RowData> udtfOutputTypeSerializer;
    private transient TypeSerializer<RowData> udtfInputTypeSerializer;
    private transient RowDataSerializer forwardedInputSerializer;
    private transient RowData input;
    private transient boolean hasJoined;
    private transient boolean isFinishResult;

    public RowDataPythonTableFunctionOperator(Configuration configuration, PythonFunctionInfo pythonFunctionInfo, RowType rowType, RowType rowType2, int[] iArr, FlinkJoinType flinkJoinType) {
        super(configuration, pythonFunctionInfo, rowType, rowType2, iArr, flinkJoinType);
    }

    @Override // org.apache.flink.table.runtime.operators.python.table.AbstractPythonTableFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        this.reuseJoinedRow = new JoinedRowData();
        this.udtfInputProjection = createUdtfInputProjection();
        this.forwardedInputSerializer = new RowDataSerializer(this.inputType);
        this.udtfInputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(this.userDefinedFunctionInputType);
        this.udtfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(this.userDefinedFunctionOutputType);
        this.input = null;
        this.hasJoined = false;
        this.isFinishResult = true;
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void bufferInput(RowData rowData) {
        RowData copy = this.forwardedInputSerializer.copy(rowData);
        copy.setRowKind(rowData.getRowKind());
        this.forwardedInputQueue.add(copy);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public RowData getFunctionInput(RowData rowData) {
        return this.udtfInputProjection.apply(rowData);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void processElementInternal(RowData rowData) throws Exception {
        this.udtfInputTypeSerializer.serialize(getFunctionInput(rowData), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
    }

    private Projection<RowData, BinaryRowData> createUdtfInputProjection() {
        return (Projection) ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(new TableConfig()), "UdtfInputProjection", this.inputType, this.userDefinedFunctionInputType, this.userDefinedFunctionInputOffsets).newInstance(Thread.currentThread().getContextClassLoader());
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        if (this.isFinishResult) {
            this.input = (RowData) this.forwardedInputQueue.poll();
            this.hasJoined = false;
        }
        do {
            byte[] bArr = (byte[]) tuple2.f0;
            this.isFinishResult = isFinishResult(bArr, ((Integer) tuple2.f1).intValue());
            if (!this.isFinishResult) {
                this.reuseJoinedRow.setRowKind(this.input.getRowKind());
                this.bais.setBuffer(bArr, 0, bArr.length);
                this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(this.input, (RowData) this.udtfOutputTypeSerializer.deserialize(this.baisWrapper)));
                tuple2 = this.pythonFunctionRunner.pollResult();
                this.hasJoined = true;
            } else if (this.joinType == FlinkJoinType.LEFT && !this.hasJoined) {
                GenericRowData genericRowData = new GenericRowData(this.userDefinedFunctionOutputType.getFieldCount());
                for (int i = 0; i < genericRowData.getArity(); i++) {
                    genericRowData.setField(i, (Object) null);
                }
                this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(this.input, genericRowData));
            }
            if (this.isFinishResult) {
                return;
            }
        } while (tuple2 != null);
    }
}
