package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.collector.CollectorWrapper;
import org.apache.flink.streaming.api.collector.StreamOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.io.RecordWriterFactory;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OutputHandler.class */
public class OutputHandler<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
    private StreamTask<OUT, ?> vertex;
    private StreamConfig configuration;
    private ClassLoader cl;
    private Output<OUT> outerOutput;
    public List<OneInputStreamOperator<?, ?>> chainedOperators = new ArrayList();
    private Map<StreamEdge, StreamOutput<?>> outputMap = new HashMap();
    private Map<Integer, StreamConfig> chainedConfigs;
    private List<StreamEdge> outEdgesInOrder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OutputHandler$CopyingOperatorCollector.class */
    public static class CopyingOperatorCollector<T> extends OperatorCollector<T> {
        private final TypeSerializer<T> serializer;

        public CopyingOperatorCollector(OneInputStreamOperator<?, T> oneInputStreamOperator, TypeSerializer<T> typeSerializer) {
            super(oneInputStreamOperator);
            this.serializer = typeSerializer;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OutputHandler.OperatorCollector
        public void collect(T t) {
            try {
                this.operator.processElement(this.serializer.copy(t));
            } catch (Exception e) {
                if (OutputHandler.LOG.isErrorEnabled()) {
                    OutputHandler.LOG.error("Could not forward element to operator.", e);
                }
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OutputHandler$OperatorCollector.class */
    public static class OperatorCollector<T> implements Output<T> {
        protected OneInputStreamOperator operator;

        public OperatorCollector(OneInputStreamOperator<?, T> oneInputStreamOperator) {
            this.operator = oneInputStreamOperator;
        }

        public void collect(T t) {
            try {
                this.operator.processElement(t);
            } catch (Exception e) {
                if (OutputHandler.LOG.isErrorEnabled()) {
                    OutputHandler.LOG.error("Could not forward element to operator.", e);
                }
                throw new RuntimeException(e);
            }
        }

        public final void close() {
            try {
                this.operator.close();
            } catch (Exception e) {
                if (OutputHandler.LOG.isErrorEnabled()) {
                    OutputHandler.LOG.error("Could not forward close call to operator.", e);
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public OutputHandler(StreamTask<OUT, ?> streamTask) {
        this.vertex = streamTask;
        this.configuration = new StreamConfig(streamTask.getTaskConfiguration());
        this.cl = streamTask.getUserCodeClassLoader();
        this.chainedConfigs = this.configuration.getTransitiveChainedTaskConfigs(this.cl);
        this.chainedConfigs.put(this.configuration.getVertexID(), this.configuration);
        this.outEdgesInOrder = this.configuration.getOutEdgesInOrder(this.cl);
        for (StreamEdge streamEdge : this.outEdgesInOrder) {
            this.outputMap.put(streamEdge, createStreamOutput(streamEdge, Integer.valueOf(streamEdge.getTargetId()), this.chainedConfigs.get(Integer.valueOf(streamEdge.getSourceId())), this.outEdgesInOrder.indexOf(streamEdge)));
        }
        this.outerOutput = (Output<OUT>) createChainedCollector(this.configuration);
    }

    public void broadcastBarrier(long j, long j2) throws IOException, InterruptedException {
        StreamingSuperstep streamingSuperstep = new StreamingSuperstep(j, j2);
        Iterator<StreamOutput<?>> it = this.outputMap.values().iterator();
        while (it.hasNext()) {
            it.next().broadcastEvent(streamingSuperstep);
        }
    }

    public Collection<StreamOutput<?>> getOutputs() {
        return this.outputMap.values();
    }

    public List<OneInputStreamOperator<?, ?>> getChainedOperators() {
        return this.chainedOperators;
    }

    private <X> Output<X> createChainedCollector(StreamConfig streamConfig) {
        CollectorWrapper collectorWrapper = new CollectorWrapper(streamConfig.getOutputSelectorWrapper(this.cl));
        for (StreamEdge streamEdge : streamConfig.getNonChainedOutputs(this.cl)) {
            collectorWrapper.addCollector(this.outputMap.get(streamEdge), streamEdge);
        }
        for (StreamEdge streamEdge2 : streamConfig.getChainedOutputs(this.cl)) {
            collectorWrapper.addCollector(createChainedCollector(this.chainedConfigs.get(Integer.valueOf(streamEdge2.getTargetId()))), streamEdge2);
        }
        if (streamConfig.isChainStart()) {
            return collectorWrapper;
        }
        OneInputStreamOperator<?, ?> oneInputStreamOperator = (OneInputStreamOperator) streamConfig.getStreamOperator(this.vertex.getUserCodeClassLoader());
        StreamingRuntimeContext createRuntimeContext = this.vertex.createRuntimeContext(streamConfig);
        this.vertex.contexts.add(createRuntimeContext);
        oneInputStreamOperator.setup(collectorWrapper, createRuntimeContext);
        this.chainedOperators.add(oneInputStreamOperator);
        return (this.vertex.getExecutionConfig().isObjectReuseEnabled() || oneInputStreamOperator.isInputCopyingDisabled()) ? new OperatorCollector(oneInputStreamOperator) : new CopyingOperatorCollector(oneInputStreamOperator, streamConfig.getTypeSerializerIn1(this.vertex.getUserCodeClassLoader()).getObjectSerializer());
    }

    public Output<OUT> getOutput() {
        return this.outerOutput;
    }

    private <T> StreamOutput<T> createStreamOutput(StreamEdge streamEdge, Integer num, StreamConfig streamConfig, int i) {
        StreamRecordSerializer<T> typeSerializerOut1 = streamConfig.getTypeSerializerOut1(this.vertex.userClassLoader);
        SerializationDelegate serializationDelegate = null;
        if (typeSerializerOut1 != null) {
            serializationDelegate = new SerializationDelegate(typeSerializerOut1);
            serializationDelegate.setInstance(typeSerializerOut1.m278createInstance());
        }
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        StreamOutput<T> streamOutput = new StreamOutput<>(RecordWriterFactory.createRecordWriter(this.vertex.getEnvironment().getWriter(i), partitioner, streamConfig.getBufferTimeout()), serializationDelegate);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Partitioner set: {} with {} outputs for {}", new Object[]{partitioner.getClass().getSimpleName(), Integer.valueOf(i), this.vertex.getClass().getSimpleName()});
        }
        return streamOutput;
    }

    public void flushOutputs() throws IOException, InterruptedException {
        Iterator<StreamOutput<?>> it = getOutputs().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void clearWriters() {
        Iterator<StreamOutput<?>> it = this.outputMap.values().iterator();
        while (it.hasNext()) {
            it.next().clearBuffers();
        }
    }
}
