package org.apache.flink.table.runtime.operators.python;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.class */
public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
    private static final long serialVersionUID = 1;
    protected final RowType inputType;
    protected final RowType outputType;
    protected final int[] userDefinedFunctionInputOffsets;
    private final Map<String, String> jobOptions;
    protected transient RowType userDefinedFunctionInputType;
    protected transient RowType userDefinedFunctionOutputType;
    protected transient LinkedList<IN> forwardedInputQueue;
    protected transient ByteArrayInputStreamWithPos bais;
    protected transient DataInputViewStreamWrapper baisWrapper;
    protected transient ByteArrayOutputStreamWithPos baos;
    protected transient DataOutputViewStreamWrapper baosWrapper;

    public AbstractStatelessFunctionOperator(Configuration configuration, RowType rowType, RowType rowType2, int[] iArr) {
        super(configuration);
        this.inputType = (RowType) Preconditions.checkNotNull(rowType);
        this.outputType = (RowType) Preconditions.checkNotNull(rowType2);
        this.userDefinedFunctionInputOffsets = (int[]) Preconditions.checkNotNull(iArr);
        this.jobOptions = buildJobOptions(configuration);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.forwardedInputQueue = new LinkedList<>();
        this.userDefinedFunctionInputType = new RowType((List) Arrays.stream(this.userDefinedFunctionInputOffsets).mapToObj(i -> {
            return (RowType.RowField) this.inputType.getFields().get(i);
        }).collect(Collectors.toList()));
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        super.open();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Object value = streamRecord.getValue();
        bufferInput(value);
        processElementInternal(value);
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
        return new BeamTableStatelessPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), this.userDefinedFunctionInputType, this.userDefinedFunctionOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), getInputOutputCoderUrn(), this.jobOptions, getFlinkMetricContainer(), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()), FlinkFnApi.CoderParam.OutputMode.SINGLE);
    }

    public abstract void bufferInput(IN in) throws Exception;

    public abstract UDFIN getFunctionInput(IN in);

    public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();

    public abstract String getInputOutputCoderUrn();

    public abstract String getFunctionUrn();

    public abstract void processElementInternal(IN in) throws Exception;

    private Map<String, String> buildJobOptions(Configuration configuration) {
        HashMap hashMap = new HashMap();
        if (configuration.containsKey("table.exec.timezone")) {
            hashMap.put("table.exec.timezone", configuration.getString("table.exec.timezone", (String) null));
        }
        return hashMap;
    }
}
