package org.apache.flink.streaming.api.streamvertex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.StreamEdge;
import org.apache.flink.streaming.api.collector.CollectorWrapper;
import org.apache.flink.streaming.api.collector.StreamOutput;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.RecordWriterFactory;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/OutputHandler.class */
public class OutputHandler<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
    private StreamVertex<?, OUT> vertex;
    private StreamConfig configuration;
    private ClassLoader cl;
    private Collector<OUT> outerCollector;
    public List<ChainableInvokable<?, ?>> chainedInvokables = new ArrayList();
    private Map<StreamEdge, StreamOutput<?>> outputMap = new HashMap();
    private Map<Integer, StreamConfig> chainedConfigs;
    private List<StreamEdge> outEdgesInOrder;

    /* JADX WARN: Multi-variable type inference failed */
    public OutputHandler(StreamVertex<?, OUT> streamVertex) {
        this.vertex = streamVertex;
        this.configuration = new StreamConfig(streamVertex.getTaskConfiguration());
        this.cl = streamVertex.getUserCodeClassLoader();
        this.chainedConfigs = this.configuration.getTransitiveChainedTaskConfigs(this.cl);
        this.chainedConfigs.put(this.configuration.getVertexID(), this.configuration);
        this.outEdgesInOrder = this.configuration.getOutEdgesInOrder(this.cl);
        for (StreamEdge streamEdge : this.outEdgesInOrder) {
            this.outputMap.put(streamEdge, createStreamOutput(streamEdge, Integer.valueOf(streamEdge.getTargetVertex()), this.chainedConfigs.get(Integer.valueOf(streamEdge.getSourceVertex())), this.outEdgesInOrder.indexOf(streamEdge)));
        }
        this.outerCollector = createChainedCollector(this.configuration);
    }

    public void broadcastBarrier(long j) throws IOException, InterruptedException {
        StreamingSuperstep streamingSuperstep = new StreamingSuperstep(j);
        Iterator<StreamOutput<?>> it = this.outputMap.values().iterator();
        while (it.hasNext()) {
            it.next().broadcastEvent(streamingSuperstep);
        }
    }

    public Collection<StreamOutput<?>> getOutputs() {
        return this.outputMap.values();
    }

    private Collector<OUT> createChainedCollector(StreamConfig streamConfig) {
        CollectorWrapper collectorWrapper = new CollectorWrapper(streamConfig.getOutputSelectorWrapper(this.cl));
        for (StreamEdge streamEdge : streamConfig.getNonChainedOutputs(this.cl)) {
            collectorWrapper.addCollector(this.outputMap.get(streamEdge), streamEdge);
        }
        for (StreamEdge streamEdge2 : streamConfig.getChainedOutputs(this.cl)) {
            collectorWrapper.addCollector(createChainedCollector(this.chainedConfigs.get(Integer.valueOf(streamEdge2.getTargetVertex()))), streamEdge2);
        }
        if (streamConfig.isChainStart()) {
            return collectorWrapper;
        }
        ChainableInvokable<?, ?> chainableInvokable = (ChainableInvokable) streamConfig.getUserInvokable(this.vertex.getUserCodeClassLoader());
        chainableInvokable.setup(collectorWrapper, streamConfig.getTypeSerializerIn1(this.vertex.getUserCodeClassLoader()));
        this.chainedInvokables.add(chainableInvokable);
        return chainableInvokable;
    }

    public Collector<OUT> getCollector() {
        return this.outerCollector;
    }

    private <T> StreamOutput<T> createStreamOutput(StreamEdge streamEdge, Integer num, StreamConfig streamConfig, int i) {
        StreamRecordSerializer<T> typeSerializerOut1 = streamConfig.getTypeSerializerOut1(this.vertex.userClassLoader);
        SerializationDelegate serializationDelegate = null;
        if (typeSerializerOut1 != null) {
            serializationDelegate = new SerializationDelegate(typeSerializerOut1);
            serializationDelegate.setInstance(typeSerializerOut1.m21createInstance());
        }
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        StreamOutput<T> streamOutput = new StreamOutput<>(RecordWriterFactory.createRecordWriter(this.vertex.getEnvironment().getWriter(i), partitioner, streamConfig.getBufferTimeout()), this.vertex.instanceID, serializationDelegate);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Partitioner set: {} with {} outputs for {}", new Object[]{partitioner.getClass().getSimpleName(), Integer.valueOf(i), this.vertex.getClass().getSimpleName()});
        }
        return streamOutput;
    }

    public void flushOutputs() throws IOException, InterruptedException {
        Iterator<StreamOutput<?>> it = getOutputs().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void clearWriters() {
        Iterator<StreamOutput<?>> it = this.outputMap.values().iterator();
        while (it.hasNext()) {
            it.next().clearBuffers();
        }
    }
}
