package org.apache.flink.streaming.api.operators.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.Constants;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.python.timer.TimerHandler;
import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import org.apache.flink.streaming.api.operators.python.timer.TimerUtils;
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.class */
public class PythonKeyedCoProcessOperator<OUT> extends AbstractTwoInputPythonFunctionOperator<Row, Row, OUT> implements Triggerable<Row, VoidNamespace> {
    private static final long serialVersionUID = 2;
    private transient InternalTimerService<VoidNamespace> internalTimerService;
    private transient TypeInformation<Row> keyTypeInfo;
    private transient TypeSerializer<Row> keyTypeSerializer;
    private transient TypeInformation<Row> timerDataTypeInfo;
    private transient TypeSerializer<Row> timerDataSerializer;
    private transient TimerHandler timerHandler;
    private transient Object keyForTimerService;

    public PythonKeyedCoProcessOperator(Configuration configuration, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<Row> typeInformation, TypeInformation<Row> typeInformation2, TypeInformation<OUT> typeInformation3) {
        super(configuration, dataStreamPythonFunctionInfo, typeInformation, typeInformation2, typeInformation3);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        this.keyTypeInfo = new RowTypeInfo(new TypeInformation[]{getLeftInputType().getTypeAt(0)});
        this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.keyTypeInfo);
        this.timerDataTypeInfo = TimerUtils.createTimerDataTypeInfo(this.keyTypeInfo);
        this.timerDataSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.timerDataTypeInfo);
        this.timerHandler = new TimerHandler();
        super.open();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamDataStreamPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), Constants.STATEFUL_FUNCTION_URN, ProtoUtils.createUserDefinedDataStreamStatefulFunctionProtos(getPythonFunctionInfo(), getRuntimeContext(), getInternalParameters(), this.keyTypeInfo, PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend())), this.jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), this.keyTypeSerializer, null, new TimerRegistration(getKeyedStateBackend(), this.internalTimerService, this, VoidNamespaceSerializer.INSTANCE, PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.timerDataTypeInfo)), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()), createInputCoderInfoDescriptor(), createOutputCoderInfoDescriptor(), TimerUtils.createTimerDataCoderInfoDescriptorProto(this.timerDataTypeInfo));
    }

    public void processElement1(StreamRecord<Row> streamRecord) throws Exception {
        processElement(true, streamRecord.getTimestamp(), this.internalTimerService.currentWatermark(), streamRecord.getValue());
    }

    public void processElement2(StreamRecord<Row> streamRecord) throws Exception {
        processElement(false, streamRecord.getTimestamp(), this.internalTimerService.currentWatermark(), streamRecord.getValue());
    }

    public void onEventTime(InternalTimer<Row, VoidNamespace> internalTimer) throws Exception {
        processTimer(TimeDomain.EVENT_TIME, internalTimer);
    }

    public void onProcessingTime(InternalTimer<Row, VoidNamespace> internalTimer) throws Exception {
        processTimer(TimeDomain.PROCESSING_TIME, internalTimer);
    }

    private void processTimer(TimeDomain timeDomain, InternalTimer<Row, VoidNamespace> internalTimer) throws Exception {
        this.timerDataSerializer.serialize(this.timerHandler.buildTimerData(timeDomain, this.internalTimerService.currentWatermark(), internalTimer.getTimestamp(), (Row) internalTimer.getKey(), null), this.baosWrapper);
        this.pythonFunctionRunner.processTimer(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void setCurrentKey(Object obj) {
        if (PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend())) {
            super.setCurrentKey(obj);
        }
        this.keyForTimerService = obj;
    }

    public Object getCurrentKey() {
        return this.keyForTimerService;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractDataStreamPythonFunctionOperator
    public <T> AbstractDataStreamPythonFunctionOperator<T> copy(DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<T> typeInformation) {
        return new PythonKeyedCoProcessOperator(this.config, dataStreamPythonFunctionInfo, getLeftInputType(), getRightInputType(), typeInformation);
    }
}
