package org.apache.flink.streaming.api.operators.python;

import java.util.LinkedList;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.types.Row;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.class */
public class PythonTimestampsAndWatermarksOperator<IN> extends OneInputPythonFunctionOperator<IN, IN, Row, Long> implements ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    public static final String STREAM_TIMESTAMP_AND_WATERMARK_OPERATOR_NAME = "_timestamp_and_watermark_operator";
    private static final String MAP_CODER_URN = "flink:coder:map:v1";
    private final WatermarkStrategy<IN> watermarkStrategy;
    private final TypeInformation<IN> inputTypeInfo;
    private boolean emitProgressiveWatermarks;
    private transient WatermarkGenerator<IN> watermarkGenerator;
    private transient WatermarkOutput watermarkOutput;
    private transient long watermarkInterval;
    private transient Row reusableInput;
    private transient StreamRecord<IN> reusableStreamRecord;
    private transient TypeSerializer<IN> inputValueSerializer;
    private transient LinkedList<IN> bufferedInputs;

    public PythonTimestampsAndWatermarksOperator(Configuration configuration, TypeInformation<IN> typeInformation, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, WatermarkStrategy<IN> watermarkStrategy) {
        super(configuration, Types.ROW(new TypeInformation[]{Types.LONG, typeInformation}), Types.LONG, dataStreamPythonFunctionInfo);
        this.emitProgressiveWatermarks = true;
        this.watermarkStrategy = watermarkStrategy;
        this.inputTypeInfo = typeInformation;
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.inputValueSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.inputTypeInfo);
        this.bufferedInputs = new LinkedList<>();
        this.reusableInput = new Row(2);
        this.reusableStreamRecord = new StreamRecord<>((Object) null);
        this.watermarkGenerator = this.emitProgressiveWatermarks ? this.watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) : new NoWatermarksGenerator<>();
        this.watermarkOutput = new TimestampsAndWatermarksOperator.WatermarkEmitter(this.output, getContainingTask().getStreamStatusMaintainer());
        this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        if (this.watermarkInterval <= 0 || !this.emitProgressiveWatermarks) {
            return;
        }
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Object copy = getExecutionConfig().isObjectReuseEnabled() ? this.inputValueSerializer.copy(streamRecord.getValue()) : streamRecord.getValue();
        this.bufferedInputs.offer(copy);
        this.reusableInput.setField(0, Long.valueOf(streamRecord.hasTimestamp() ? streamRecord.getTimestamp() : Long.MIN_VALUE));
        this.reusableInput.setField(1, copy);
        this.runnerInputTypeSerializer.serialize(this.reusableInput, this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        this.bais.setBuffer((byte[]) tuple2.f0, 0, ((Integer) tuple2.f1).intValue());
        long longValue = ((Long) this.runnerOutputTypeSerializer.deserialize(this.baisWrapper)).longValue();
        IN poll = this.bufferedInputs.poll();
        this.reusableStreamRecord.replace(poll, longValue);
        this.output.collect(this.reusableStreamRecord);
        this.watermarkGenerator.onEvent(poll, longValue, this.watermarkOutput);
    }

    @Override // org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator
    public String getCoderUrn() {
        return MAP_CODER_URN;
    }

    public void configureEmitProgressiveWatermarks(boolean z) {
        this.emitProgressiveWatermarks = z;
    }

    public void onProcessingTime(long j) {
        this.watermarkGenerator.onPeriodicEmit(this.watermarkOutput);
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            invokeFinishBundle();
            this.watermarkOutput.emitWatermark(org.apache.flink.api.common.eventtime.Watermark.MAX_WATERMARK);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void close() throws Exception {
        super.close();
        this.watermarkGenerator.onPeriodicEmit(this.watermarkOutput);
    }
}
