package org.apache.flink.streaming.api.streamvertex;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
import org.apache.flink.streaming.api.collector.StreamCollector;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.StreamRecordWriter;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/OutputHandler.class */
public class OutputHandler<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
    private StreamVertex<?, OUT> streamVertex;
    private StreamConfig configuration;
    private StreamCollector<OUT> collector;
    private long bufferTimeout;
    long startTime;
    TypeInformation<OUT> outTypeInfo = null;
    StreamRecordSerializer<OUT> outSerializer = null;
    SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
    private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs = new LinkedList();

    public OutputHandler(StreamVertex<?, OUT> streamVertex) {
        this.streamVertex = streamVertex;
        this.configuration = new StreamConfig(streamVertex.getTaskConfiguration());
        try {
            setConfigOutputs();
        } catch (StreamVertexException e) {
            throw new StreamVertexException("Cannot register outputs for " + streamVertex.getClass().getSimpleName(), e);
        }
    }

    public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
        return this.outputs;
    }

    private void setConfigOutputs() {
        setSerializers();
        setCollector();
        int numberOfOutputs = this.configuration.getNumberOfOutputs();
        this.bufferTimeout = this.configuration.getBufferTimeout();
        for (int i = 0; i < numberOfOutputs; i++) {
            setPartitioner(i, this.outputs);
        }
    }

    private StreamCollector<OUT> setCollector() {
        if (this.streamVertex.configuration.getDirectedEmit()) {
            this.collector = new DirectedStreamCollector(this.streamVertex.getInstanceID(), this.outSerializationDelegate, this.streamVertex.configuration.getOutputSelector(this.streamVertex.userClassLoader));
        } else {
            this.collector = new StreamCollector<>(this.streamVertex.getInstanceID(), this.outSerializationDelegate);
        }
        return this.collector;
    }

    public StreamCollector<OUT> getCollector() {
        return this.collector;
    }

    void setSerializers() {
        this.outSerializer = this.configuration.getTypeSerializerOut1(this.streamVertex.userClassLoader);
        if (this.outSerializer != null) {
            this.outSerializationDelegate = new SerializationDelegate<>(this.outSerializer);
            this.outSerializationDelegate.setInstance(this.outSerializer.m13createInstance());
        }
    }

    void setPartitioner(int i, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> list) {
        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
        try {
            StreamPartitioner partitioner = this.configuration.getPartitioner(this.streamVertex.userClassLoader, i);
            if (this.bufferTimeout >= 0) {
                recordWriter = new StreamRecordWriter(this.streamVertex, partitioner, this.bufferTimeout);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}", Long.valueOf(this.bufferTimeout), this.streamVertex.getClass().getSimpleName());
                }
            } else {
                recordWriter = new RecordWriter<>(this.streamVertex, partitioner);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("RecordWriter initiated for {}", this.streamVertex.getClass().getSimpleName());
                }
            }
            list.add(recordWriter);
            List<String> outputName = this.configuration.getOutputName(i);
            boolean selectAll = this.configuration.getSelectAll(i);
            if (this.collector != null) {
                this.collector.addOutput(recordWriter, outputName, selectAll);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Partitioner set: {} with {} outputs for {}", new Object[]{partitioner.getClass().getSimpleName(), Integer.valueOf(i), this.streamVertex.getClass().getSimpleName()});
            }
        } catch (Exception e) {
            throw new StreamVertexException("Cannot deserialize partitioner for " + this.streamVertex.getName() + " with " + i + " outputs", e);
        }
    }

    public void flushOutputs() throws IOException, InterruptedException {
        for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : this.outputs) {
            if (recordWriter instanceof StreamRecordWriter) {
                ((StreamRecordWriter) recordWriter).close();
            } else {
                recordWriter.flush();
            }
        }
    }

    public void initializeOutputSerializers() {
        Iterator<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> it = this.outputs.iterator();
        while (it.hasNext()) {
            it.next().initializeSerializers();
        }
    }

    public void invokeUserFunction(String str, StreamInvokable<?, OUT> streamInvokable) throws IOException, InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} {} invoked with instance id {}", new Object[]{str, this.streamVertex.getName(), Integer.valueOf(this.streamVertex.getInstanceID())});
        }
        initializeOutputSerializers();
        try {
            this.streamVertex.invokeUserFunction(streamInvokable);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} {} invoke finished instance id {}", new Object[]{str, this.streamVertex.getName(), Integer.valueOf(this.streamVertex.getInstanceID())});
            }
            flushOutputs();
        } catch (Exception e) {
            flushOutputs();
            throw new RuntimeException(e);
        }
    }
}
