package org.apache.flink.streaming.runtime.tasks;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.HelpFormatter;
import org.apache.flink.streaming.api.collector.StreamOutput;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamIterationHead.class */
public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
    private BlockingQueue<StreamRecord> dataChannel = new ArrayBlockingQueue(1);
    private long iterationWaitTime;
    private boolean shouldWait;

    @Override // org.apache.flink.streaming.runtime.tasks.OneInputStreamTask, org.apache.flink.streaming.runtime.tasks.StreamTask, org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void registerInputOutput() {
        super.registerInputOutput();
        this.outputHandler = new OutputHandler<>(this);
        Integer iterationId = this.configuration.getIterationId();
        this.iterationWaitTime = this.configuration.getIterationWaitTime();
        this.shouldWait = this.iterationWaitTime > 0;
        try {
            BlockingQueueBroker.instance().handIn(iterationId.toString() + HelpFormatter.DEFAULT_OPT_PREFIX + getEnvironment().getIndexInSubtaskGroup(), this.dataChannel);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.OneInputStreamTask, org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void invoke() throws Exception {
        this.isRunning = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Iteration source {} invoked", getName());
        }
        Collection<StreamOutput<?>> outputs = this.outputHandler.getOutputs();
        while (true) {
            try {
                try {
                    StreamRecord poll = this.shouldWait ? this.dataChannel.poll(this.iterationWaitTime, TimeUnit.MILLISECONDS) : this.dataChannel.take();
                    if (poll == null) {
                        return;
                    }
                    Iterator<StreamOutput<?>> it = outputs.iterator();
                    while (it.hasNext()) {
                        it.next().collect(poll.getObject());
                    }
                } catch (Exception e) {
                    LOG.error("Iteration Head " + getEnvironment().getTaskNameWithSubtasks() + " failed", (Throwable) e);
                    throw e;
                }
            } finally {
                this.isRunning = false;
                this.outputHandler.flushOutputs();
                clearBuffers();
            }
        }
    }
}
