/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public final class StreamOneInputProcessor<IN>
implements StreamInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamOneInputProcessor.class);
    private final StreamTaskInput input;
    private final Object lock;
    private final OperatorChain<?, ?> operatorChain;
    private StatusWatermarkValve statusWatermarkValve;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final OneInputStreamOperator<IN, ?> streamOperator;
    private final WatermarkGauge watermarkGauge;
    private Counter numRecordsIn;

    public StreamOneInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, StreamTask<?, ?> checkpointedTask, CheckpointingMode checkpointMode, Object lock, IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> streamOperator, TaskIOMetricGroup metrics, WatermarkGauge watermarkGauge, String taskName, OperatorChain<?, ?> operatorChain) throws IOException {
        InputGate inputGate = InputGateUtil.createInputGate(inputGates);
        CheckpointedInputGate barrierHandler = InputProcessorUtil.createCheckpointedInputGate(checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig, taskName);
        this.input = new StreamTaskNetworkInput(barrierHandler, inputSerializer, ioManager, 0);
        this.lock = Preconditions.checkNotNull((Object)lock);
        this.streamStatusMaintainer = (StreamStatusMaintainer)Preconditions.checkNotNull((Object)streamStatusMaintainer);
        this.streamOperator = (OneInputStreamOperator)Preconditions.checkNotNull(streamOperator);
        this.statusWatermarkValve = new StatusWatermarkValve(inputGate.getNumberOfInputChannels(), new ForwardingValveOutputHandler(streamOperator, lock));
        this.watermarkGauge = watermarkGauge;
        metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
        this.operatorChain = (OperatorChain)Preconditions.checkNotNull(operatorChain);
    }

    @Override
    public boolean processInput() throws Exception {
        this.initializeNumRecordsIn();
        StreamElement recordOrMark = (StreamElement)this.input.pollNextNullable();
        if (recordOrMark == null) {
            this.input.isAvailable().get();
            return !this.checkFinished();
        }
        int channel = this.input.getLastChannel();
        Preconditions.checkState((channel != -1 ? 1 : 0) != 0);
        this.processElement(recordOrMark, channel);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkFinished() throws Exception {
        boolean isFinished = this.input.isFinished();
        if (isFinished) {
            Object object = this.lock;
            synchronized (object) {
                this.operatorChain.endInput(1);
            }
        }
        return isFinished;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processElement(StreamElement recordOrMark, int channel) throws Exception {
        if (recordOrMark.isRecord()) {
            StreamRecord record = recordOrMark.asRecord();
            Object object = this.lock;
            synchronized (object) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement1(record);
                this.streamOperator.processElement(record);
            }
        } else if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), channel);
        } else if (recordOrMark.isStreamStatus()) {
            this.statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
        } else if (recordOrMark.isLatencyMarker()) {
            Object object = this.lock;
            synchronized (object) {
                this.streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
            }
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }

    private void initializeNumRecordsIn() {
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
    }

    @Override
    public void close() throws IOException {
        this.input.close();
    }

    private class ForwardingValveOutputHandler
    implements StatusWatermarkValve.ValveOutputHandler {
        private final OneInputStreamOperator<IN, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler(OneInputStreamOperator<IN, ?> operator, Object lock) {
            this.operator = (OneInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lock = Preconditions.checkNotNull((Object)lock);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamOneInputProcessor.this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark(watermark);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamOneInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(streamStatus);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }
}

