package org.apache.flink.streaming.api.operators.python;

import java.util.Collections;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.utils.input.KeyedTwoInputWithTimerRowFactory;
import org.apache.flink.streaming.api.utils.output.OutputWithTimerRowHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.class */
public class PythonKeyedCoProcessOperator<OUT> extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT> implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
    private static final String KEYED_CO_PROCESS_FUNCTION_URN = "flink:transform:keyed_process_function:v1";
    private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
    private final TypeInformation<Row> keyTypeInfo;
    private final TypeSerializer keyTypeSerializer;
    private final TypeInformation<OUT> outputTypeInfo;
    private transient InternalTimerService<VoidNamespace> internalTimerService;
    private transient KeyedTwoInputWithTimerRowFactory runnerInputFactory;
    private transient OutputWithTimerRowHandler runnerOutputHandler;
    private transient Object keyForTimerService;

    public PythonKeyedCoProcessOperator(Configuration configuration, TypeInformation<Row> typeInformation, TypeInformation<Row> typeInformation2, TypeInformation<OUT> typeInformation3, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo) {
        super(configuration, dataStreamPythonFunctionInfo, FLAT_MAP_CODER_URN, KeyedTwoInputWithTimerRowFactory.getRunnerInputTypeInfo(typeInformation, typeInformation2, constructKeyTypeInfo(typeInformation)), OutputWithTimerRowHandler.getRunnerOutputTypeInfo(typeInformation3, constructKeyTypeInfo(typeInformation)));
        this.keyTypeInfo = constructKeyTypeInfo(typeInformation);
        this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.keyTypeInfo);
        this.outputTypeInfo = typeInformation3;
    }

    @Override // org.apache.flink.streaming.api.operators.python.TwoInputPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamDataStreamPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), getRunnerInputTypeInfo(), getRunnerOutputTypeInfo(), KEYED_CO_PROCESS_FUNCTION_URN, PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(getPythonFunctionInfo(), getRuntimeContext(), Collections.EMPTY_MAP, this.keyTypeInfo, PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend())), getCoderUrn(), getJobOptions(), getFlinkMetricContainer(), getKeyedStateBackend(), this.keyTypeSerializer, null, getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
    }

    @Override // org.apache.flink.streaming.api.operators.python.TwoInputPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        this.runnerInputFactory = new KeyedTwoInputWithTimerRowFactory();
        this.runnerOutputHandler = new OutputWithTimerRowHandler(getKeyedStateBackend(), this.internalTimerService, new TimestampedCollector(this.output), this, VoidNamespaceSerializer.INSTANCE);
        super.open();
    }

    @Override // org.apache.flink.streaming.api.operators.python.TwoInputPythonFunctionOperator
    public void processElement1(StreamRecord<Row> streamRecord) throws Exception {
        processElement(true, streamRecord);
    }

    @Override // org.apache.flink.streaming.api.operators.python.TwoInputPythonFunctionOperator
    public void processElement2(StreamRecord<Row> streamRecord) throws Exception {
        processElement(false, streamRecord);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        byte[] bArr = (byte[]) tuple2.f0;
        int intValue = ((Integer) tuple2.f1).intValue();
        if (PythonOperatorUtils.endOfLastFlatMap(intValue, bArr)) {
            this.bufferedTimestamp.poll();
            return;
        }
        this.bais.setBuffer(bArr, 0, intValue);
        this.runnerOutputHandler.accept((Row) getRunnerOutputTypeSerializer().deserialize(this.baisWrapper), this.bufferedTimestamp.peek().longValue());
    }

    public TypeInformation<OUT> getProducedType() {
        return this.outputTypeInfo;
    }

    public void onEventTime(InternalTimer<Row, VoidNamespace> internalTimer) throws Exception {
        this.bufferedTimestamp.offer(Long.valueOf(internalTimer.getTimestamp()));
        processTimer(TimeDomain.EVENT_TIME, internalTimer);
    }

    public void onProcessingTime(InternalTimer<Row, VoidNamespace> internalTimer) throws Exception {
        this.bufferedTimestamp.offer(Long.MIN_VALUE);
        processTimer(TimeDomain.PROCESSING_TIME, internalTimer);
    }

    private void processTimer(TimeDomain timeDomain, InternalTimer<Row, VoidNamespace> internalTimer) throws Exception {
        getRunnerInputTypeSerializer().serialize(this.runnerInputFactory.fromTimer(timeDomain, internalTimer.getTimestamp(), this.internalTimerService.currentWatermark(), (Row) internalTimer.getKey(), null), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    private void processElement(boolean z, StreamRecord<Row> streamRecord) throws Exception {
        this.bufferedTimestamp.offer(Long.valueOf(streamRecord.getTimestamp()));
        getRunnerInputTypeSerializer().serialize(this.runnerInputFactory.fromNormalData(z, streamRecord.getTimestamp(), this.internalTimerService.currentWatermark(), (Row) streamRecord.getValue()), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    private static TypeInformation<Row> constructKeyTypeInfo(TypeInformation<Row> typeInformation) {
        return new RowTypeInfo(new TypeInformation[]{((RowTypeInfo) typeInformation).getTypeAt(0)});
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void setCurrentKey(Object obj) {
        if (PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend())) {
            super.setCurrentKey(obj);
        }
        this.keyForTimerService = obj;
    }

    public Object getCurrentKey() {
        return this.keyForTimerService;
    }
}
