/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamIterationHead<OUT>
extends OneInputStreamTask<OUT, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
    private volatile boolean running = true;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void run() throws Exception {
        String iterationId = this.getConfiguration().getIterationId();
        if (iterationId == null || iterationId.length() == 0) {
            throw new Exception("Missing iteration ID in the task configuration");
        }
        String brokerID = StreamIterationHead.createBrokerIdString(this.getEnvironment().getJobID(), iterationId, this.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
        long iterationWaitTime = this.getConfiguration().getIterationWaitTime();
        boolean shouldWait = iterationWaitTime > 0L;
        ArrayBlockingQueue dataChannel = new ArrayBlockingQueue(1);
        BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
        LOG.info("Iteration head {} added feedback queue under {}", (Object)this.getName(), (Object)brokerID);
        try {
            RecordWriterOutput<?>[] outputs = this.getStreamOutputs();
            if (this.isSerializingTimestamps()) {
                for (RecordWriterOutput<?> output : outputs) {
                    output.emitWatermark(new Watermark(Long.MAX_VALUE));
                }
            }
            while (this.running) {
                StreamRecord nextRecord;
                StreamRecord streamRecord = nextRecord = shouldWait ? (StreamRecord)dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : (StreamRecord)dataChannel.take();
                if (nextRecord != null) {
                    for (RecordWriterOutput<?> output : outputs) {
                        output.collect(nextRecord);
                    }
                    continue;
                }
                break;
            }
        }
        finally {
            BlockingQueueBroker.INSTANCE.remove(brokerID);
            LOG.info("Iteration head {} removed feedback queue under {}", (Object)this.getName(), (Object)brokerID);
        }
    }

    @Override
    protected void cancelTask() {
        this.running = false;
    }

    @Override
    public void init() {
    }

    @Override
    protected void cleanup() throws Exception {
    }

    public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {
        return jid + "-" + iterationID + "-" + subtaskIndex;
    }
}

