package org.apache.flink.streaming.api.streamvertex;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/StreamIterationHead.class */
public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
    private OutputHandler<OUT> outputHandler;
    private static int numSources;
    private Integer iterationId;
    private BlockingQueue<StreamRecord> dataChannel;
    private long iterationWaitTime;
    private boolean shouldWait;

    public StreamIterationHead() {
        numSources = newVertex();
        this.instanceID = numSources;
        this.dataChannel = new ArrayBlockingQueue(1);
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamVertex
    public void setInputsOutputs() {
        this.outputHandler = new OutputHandler<>(this);
        this.iterationId = this.configuration.getIterationId();
        this.iterationWaitTime = this.configuration.getIterationWaitTime();
        this.shouldWait = this.iterationWaitTime > 0;
        try {
            BlockingQueueBroker.instance().handIn(this.iterationId.toString(), this.dataChannel);
        } catch (Exception e) {
        }
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamVertex
    public void invoke() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("SOURCE {} invoked with instance id {}", getName(), Integer.valueOf(getInstanceID()));
        }
        this.outputHandler.initializeOutputSerializers();
        while (true) {
            StreamRecord poll = this.shouldWait ? this.dataChannel.poll(this.iterationWaitTime, TimeUnit.MILLISECONDS) : this.dataChannel.take();
            if (poll == null) {
                this.outputHandler.flushOutputs();
                return;
            }
            for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : this.outputHandler.getOutputs()) {
                this.outputHandler.outSerializationDelegate.setInstance(poll);
                recordWriter.emit(this.outputHandler.outSerializationDelegate);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamVertex
    protected void setInvokable() {
    }
}
