/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.apache.flink.streaming.runtime.io.BarrierTracker;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

@Internal
public class StreamInputProcessor<IN> {
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private final CheckpointBarrierHandler barrierHandler;
    private int currentChannel = -1;
    private boolean isFinished;
    private final long[] watermarks;
    private long lastEmittedWatermark;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private Counter numRecordsIn;

    public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, StatefulTask<?> checkpointListener, CheckpointingMode checkpointMode, IOManager ioManager, boolean enableWatermarkMultiplexing, Configuration taskManagerConfig) throws IOException {
        int i;
        TypeSerializer ser;
        InputGate inputGate = InputGateUtil.createInputGate(inputGates);
        if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
            long maxAlign = taskManagerConfig.getLong("task.checkpoint.alignment.max-size", -1L);
            if (maxAlign != -1L && maxAlign <= 0L) {
                throw new IllegalConfigurationException("task.checkpoint.alignment.max-size must be positive or -1 (infinite)");
            }
            this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
        } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
            this.barrierHandler = new BarrierTracker(inputGate);
        } else {
            throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + (Object)((Object)checkpointMode));
        }
        if (checkpointListener != null) {
            this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
        }
        if (enableWatermarkMultiplexing) {
            ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
            this.deserializationDelegate = new NonReusingDeserializationDelegate(ser);
        } else {
            ser = new StreamRecordSerializer<IN>(inputSerializer);
            this.deserializationDelegate = new NonReusingDeserializationDelegate(ser);
        }
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
        for (i = 0; i < this.recordDeserializers.length; ++i) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(ioManager.getSpillingDirectoriesPaths());
        }
        this.watermarks = new long[inputGate.getNumberOfInputChannels()];
        for (i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {
            this.watermarks[i] = Long.MIN_VALUE;
        }
        this.lastEmittedWatermark = Long.MIN_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, Object lock) throws Exception {
        block16: {
            AbstractEvent event;
            if (this.isFinished) {
                return false;
            }
            if (this.numRecordsIn == null) {
                this.numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn");
            }
            while (true) {
                BufferOrEvent bufferOrEvent;
                if (this.currentRecordDeserializer != null) {
                    RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                    if (result.isBufferConsumed()) {
                        this.currentRecordDeserializer.getCurrentBuffer().recycle();
                        this.currentRecordDeserializer = null;
                    }
                    if (result.isFullRecord()) {
                        StreamElement recordOrWatermark = (StreamElement)this.deserializationDelegate.getInstance();
                        if (recordOrWatermark.isWatermark()) {
                            long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
                            if (watermarkMillis <= this.watermarks[this.currentChannel]) continue;
                            this.watermarks[this.currentChannel] = watermarkMillis;
                            long newMinWatermark = Long.MAX_VALUE;
                            for (long watermark : this.watermarks) {
                                newMinWatermark = Math.min(watermark, newMinWatermark);
                            }
                            if (newMinWatermark <= this.lastEmittedWatermark) continue;
                            this.lastEmittedWatermark = newMinWatermark;
                            Object object = lock;
                            synchronized (object) {
                                streamOperator.processWatermark(new Watermark(this.lastEmittedWatermark));
                            }
                        }
                        StreamRecord record = recordOrWatermark.asRecord();
                        Object object = lock;
                        synchronized (object) {
                            this.numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
                if ((bufferOrEvent = this.barrierHandler.getNextNonBlocked()) == null) break block16;
                if (bufferOrEvent.isBuffer()) {
                    this.currentChannel = bufferOrEvent.getChannelIndex();
                    this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
                    this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                    continue;
                }
                event = bufferOrEvent.getEvent();
                if (event.getClass() != EndOfPartitionEvent.class) break;
            }
            throw new IOException("Unexpected event: " + event);
        }
        this.isFinished = true;
        if (!this.barrierHandler.isEmpty()) {
            throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
        }
        return false;
    }

    public void setReporter(AccumulatorRegistry.Reporter reporter) {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializers) {
            deserializer.setReporter(reporter);
        }
    }

    public void setMetricGroup(IOMetricGroup metrics) {
        metrics.gauge("currentLowWatermark", (Gauge)new Gauge<Long>(){

            public Long getValue() {
                return StreamInputProcessor.this.lastEmittedWatermark;
            }
        });
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializers) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer == null || buffer.isRecycled()) continue;
            buffer.recycle();
        }
        this.barrierHandler.cleanup();
    }
}

