package org.apache.flink.streaming.api.operators.python;

import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.utils.input.KeyedInputWithTimerRowFactory;
import org.apache.flink.streaming.api.utils.output.OutputWithTimerRowHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.types.Row;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.class */
public class PythonKeyedProcessOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT> implements ResultTypeQueryable<OUT>, Triggerable<Row, Object> {
    private static final long serialVersionUID = 1;
    private static final String KEYED_PROCESS_FUNCTION_URN = "flink:transform:keyed_process_function:v1";
    private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
    private final Map<String, String> jobOptions;
    private final TypeInformation<Row> inputTypeInfo;
    private final TypeInformation<OUT> outputTypeInfo;
    private final DataStreamPythonFunctionInfo pythonFunctionInfo;
    private final TypeSerializer namespaceSerializer;
    private transient TypeInformation<Row> keyTypeInfo;
    private transient TypeInformation<Row> runnerInputTypeInfo;
    private transient TypeInformation<Row> runnerOutputTypeInfo;
    private transient TypeSerializer runnerInputSerializer;
    private transient TypeSerializer runnerOutputSerializer;
    private transient TypeSerializer keyTypeSerializer;
    private transient InternalTimerService internalTimerService;
    private transient LinkedList<Long> bufferedTimestamp;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;
    private transient DataOutputViewStreamWrapper baosWrapper;
    private transient KeyedInputWithTimerRowFactory runnerInputFactory;
    private transient OutputWithTimerRowHandler runnerOutputHandler;
    private transient Object keyForTimerService;

    public PythonKeyedProcessOperator(Configuration configuration, RowTypeInfo rowTypeInfo, TypeInformation<OUT> typeInformation, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo) {
        this(configuration, rowTypeInfo, typeInformation, dataStreamPythonFunctionInfo, VoidNamespaceSerializer.INSTANCE);
    }

    public PythonKeyedProcessOperator(Configuration configuration, RowTypeInfo rowTypeInfo, TypeInformation<OUT> typeInformation, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeSerializer typeSerializer) {
        super(configuration);
        this.jobOptions = configuration.toMap();
        this.inputTypeInfo = rowTypeInfo;
        this.outputTypeInfo = typeInformation;
        this.pythonFunctionInfo = dataStreamPythonFunctionInfo;
        this.namespaceSerializer = typeSerializer;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.keyTypeInfo = new RowTypeInfo(new TypeInformation[]{this.inputTypeInfo.getTypeAt(0)});
        this.runnerInputTypeInfo = KeyedInputWithTimerRowFactory.getRunnerInputTypeInfo(this.inputTypeInfo, this.keyTypeInfo);
        this.runnerOutputTypeInfo = OutputWithTimerRowHandler.getRunnerOutputTypeInfo(this.outputTypeInfo, this.keyTypeInfo);
        this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.keyTypeInfo);
        this.runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerInputTypeInfo);
        this.runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerOutputTypeInfo);
        this.internalTimerService = getInternalTimerService("user-timers", this.namespaceSerializer, this);
        this.bufferedTimestamp = new LinkedList<>();
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.runnerInputFactory = new KeyedInputWithTimerRowFactory();
        this.runnerOutputHandler = new OutputWithTimerRowHandler(getKeyedStateBackend(), this.internalTimerService, new TimestampedCollector(this.output), this, this.namespaceSerializer);
        super.open();
    }

    public TypeInformation<OUT> getProducedType() {
        return this.outputTypeInfo;
    }

    public void onEventTime(InternalTimer<Row, Object> internalTimer) throws Exception {
        this.bufferedTimestamp.offer(Long.valueOf(internalTimer.getTimestamp()));
        processTimer(TimeDomain.EVENT_TIME, internalTimer);
    }

    public void onProcessingTime(InternalTimer<Row, Object> internalTimer) throws Exception {
        this.bufferedTimestamp.offer(Long.MIN_VALUE);
        processTimer(TimeDomain.PROCESSING_TIME, internalTimer);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamDataStreamPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), this.runnerInputTypeInfo, this.runnerOutputTypeInfo, KEYED_PROCESS_FUNCTION_URN, PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(this.pythonFunctionInfo, getRuntimeContext(), Collections.EMPTY_MAP, this.keyTypeInfo, PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend())), FLAT_MAP_CODER_URN, this.jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), this.keyTypeSerializer, this.namespaceSerializer, getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.pythonFunctionInfo.getPythonFunction().getPythonEnv();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        byte[] bArr = (byte[]) tuple2.f0;
        int intValue = ((Integer) tuple2.f1).intValue();
        if (PythonOperatorUtils.endOfLastFlatMap(intValue, bArr)) {
            this.bufferedTimestamp.poll();
            return;
        }
        this.bais.setBuffer(bArr, 0, intValue);
        this.runnerOutputHandler.accept((Row) this.runnerOutputSerializer.deserialize(this.baisWrapper), this.bufferedTimestamp.peek().longValue());
    }

    public void processElement(StreamRecord<Row> streamRecord) throws Exception {
        this.bufferedTimestamp.offer(Long.valueOf(streamRecord.getTimestamp()));
        this.runnerInputSerializer.serialize(this.runnerInputFactory.fromNormalData(streamRecord.getTimestamp(), this.internalTimerService.currentWatermark(), (Row) streamRecord.getValue()), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    private void processTimer(TimeDomain timeDomain, InternalTimer<Row, Object> internalTimer) throws Exception {
        byte[] byteArray;
        long timestamp = internalTimer.getTimestamp();
        Row row = (Row) internalTimer.getKey();
        Object namespace = internalTimer.getNamespace();
        if (VoidNamespace.INSTANCE.equals(namespace)) {
            byteArray = null;
        } else {
            this.namespaceSerializer.serialize(namespace, this.baosWrapper);
            byteArray = this.baos.toByteArray();
            this.baos.reset();
        }
        this.runnerInputSerializer.serialize(this.runnerInputFactory.fromTimer(timeDomain, timestamp, this.internalTimerService.currentWatermark(), row, byteArray), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void setCurrentKey(Object obj) {
        if (PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend())) {
            super.setCurrentKey(obj);
        }
        this.keyForTimerService = obj;
    }

    public Object getCurrentKey() {
        return this.keyForTimerService;
    }
}
