package org.apache.flink.streaming.api.operators.python;

import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.types.Row;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.class */
public abstract class TwoInputPythonFunctionOperator<IN1, IN2, OUT> extends AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1;
    private static final String DATASTREAM_STATELESS_FUNCTION_URN = "flink:transform:datastream_stateless_function:v1";
    private final Map<String, String> jobOptions;
    private final TypeInformation<IN1> inputTypeInfo1;
    private final TypeInformation<IN2> inputTypeInfo2;
    private final TypeInformation<OUT> outputTypeInfo;
    private final DataStreamPythonFunctionInfo pythonFunctionInfo;
    private final boolean isKeyedStream;
    private transient TypeInformation<Row> runnerInputTypeInfo;
    private transient TypeInformation<Row> runnerOutputTypeInfo;
    private transient TypeSerializer<Row> runnerInputTypeSerializer;
    transient TypeSerializer<Row> runnerOutputTypeSerializer;
    protected transient ByteArrayInputStreamWithPos bais;
    protected transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;
    private transient DataOutputViewStreamWrapper baosWrapper;
    protected transient TimestampedCollector collector;
    private transient Row reuseRow;
    transient LinkedList<Long> bufferedTimestamp1;
    transient LinkedList<Long> bufferedTimestamp2;

    public TwoInputPythonFunctionOperator(Configuration configuration, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, boolean z) {
        super(configuration);
        this.jobOptions = configuration.toMap();
        this.inputTypeInfo1 = typeInformation;
        this.inputTypeInfo2 = typeInformation2;
        this.outputTypeInfo = typeInformation3;
        this.pythonFunctionInfo = dataStreamPythonFunctionInfo;
        this.isKeyedStream = z;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.bufferedTimestamp1 = new LinkedList<>();
        this.bufferedTimestamp2 = new LinkedList<>();
        this.runnerInputTypeInfo = getRunnerInputTypeInfo();
        this.runnerInputTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerInputTypeInfo);
        this.runnerOutputTypeInfo = Types.ROW(new TypeInformation[]{Types.BYTE, this.outputTypeInfo});
        this.runnerOutputTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerOutputTypeInfo);
        this.collector = new TimestampedCollector(this.output);
        this.reuseRow = new Row(3);
        super.open();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamDataStreamPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), this.runnerInputTypeInfo, this.runnerOutputTypeInfo, DATASTREAM_STATELESS_FUNCTION_URN, PythonOperatorUtils.getUserDefinedDataStreamFunctionProto(this.pythonFunctionInfo, getRuntimeContext(), Collections.EMPTY_MAP), getFunctionUrn(), this.jobOptions, getFlinkMetricContainer(), null, null, getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.pythonFunctionInfo.getPythonFunction().getPythonEnv();
    }

    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        this.bufferedTimestamp1.offer(Long.valueOf(streamRecord.getTimestamp()));
        this.reuseRow.setField(0, true);
        this.reuseRow.setField(1, getValue(streamRecord));
        this.reuseRow.setField(2, (Object) null);
        processElementInternal();
    }

    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        this.bufferedTimestamp2.offer(Long.valueOf(streamRecord.getTimestamp()));
        this.reuseRow.setField(0, false);
        this.reuseRow.setField(1, (Object) null);
        this.reuseRow.setField(2, getValue(streamRecord));
        processElementInternal();
    }

    public abstract String getFunctionUrn();

    private TypeInformation<Row> getRunnerInputTypeInfo() {
        return this.isKeyedStream ? new RowTypeInfo(new TypeInformation[]{Types.BOOLEAN, this.inputTypeInfo1.getTypeAt(1), this.inputTypeInfo2.getTypeAt(1)}) : new RowTypeInfo(new TypeInformation[]{Types.BOOLEAN, this.inputTypeInfo1, this.inputTypeInfo2});
    }

    private Object getValue(StreamRecord<?> streamRecord) {
        return this.isKeyedStream ? ((Row) streamRecord.getValue()).getField(1) : streamRecord.getValue();
    }

    private void processElementInternal() throws Exception {
        this.runnerInputTypeSerializer.serialize(this.reuseRow, this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }
}
