package org.apache.flink.streaming.api.operators.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/PythonProcessOperator.class */
public class PythonProcessOperator<IN, OUT> extends OneInputPythonFunctionOperator<IN, OUT, Row, OUT> {
    private static final long serialVersionUID = 1;
    private static final String PROCESS_FUNCTION_URN = "flink:transform:process_function:v1";
    private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
    private transient Row reusableInput;
    private transient long currentWatermark;

    public PythonProcessOperator(Configuration configuration, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo) {
        super(configuration, Types.ROW(new TypeInformation[]{Types.LONG, Types.LONG, typeInformation}), typeInformation2, dataStreamPythonFunctionInfo);
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.reusableInput = new Row(3);
        this.currentWatermark = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        byte[] bArr = (byte[]) tuple2.f0;
        int intValue = ((Integer) tuple2.f1).intValue();
        if (PythonOperatorUtils.endOfLastFlatMap(intValue, bArr)) {
            this.bufferedTimestamp.poll();
            return;
        }
        this.bais.setBuffer(bArr, 0, intValue);
        Object deserialize = this.runnerOutputTypeSerializer.deserialize(this.baisWrapper);
        this.collector.setAbsoluteTimestamp(this.bufferedTimestamp.peek().longValue());
        this.collector.collect(deserialize);
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.reusableInput.setField(0, Long.valueOf(streamRecord.getTimestamp()));
        this.reusableInput.setField(1, Long.valueOf(this.currentWatermark));
        this.reusableInput.setField(2, streamRecord.getValue());
        streamRecord.replace(this.reusableInput);
        super.processElement(streamRecord);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator
    public String getFunctionUrn() {
        return PROCESS_FUNCTION_URN;
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator
    public String getCoderUrn() {
        return FLAT_MAP_CODER_URN;
    }
}
