package org.apache.nemo.runtime.executor.datatransfer;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.class */
public final class OperatorVertexOutputCollector<O> implements OutputCollector<O> {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
    private final IRVertex irVertex;
    private final List<NextIntraTaskOperatorInfo> internalMainOutputs;
    private final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputs;
    private final List<OutputWriter> externalMainOutputs;
    private final Map<String, List<OutputWriter>> externalAdditionalOutputs;

    public OperatorVertexOutputCollector(IRVertex iRVertex, List<NextIntraTaskOperatorInfo> list, Map<String, List<NextIntraTaskOperatorInfo>> map, List<OutputWriter> list2, Map<String, List<OutputWriter>> map2) {
        this.irVertex = iRVertex;
        this.internalMainOutputs = list;
        this.internalAdditionalOutputs = map;
        this.externalMainOutputs = list2;
        this.externalAdditionalOutputs = map2;
    }

    private void emit(OperatorVertex operatorVertex, O o) {
        operatorVertex.getTransform().onData(o);
    }

    private void emit(OutputWriter outputWriter, O o) {
        outputWriter.write(o);
    }

    public void emit(O o) {
        Iterator<NextIntraTaskOperatorInfo> it = this.internalMainOutputs.iterator();
        while (it.hasNext()) {
            emit(it.next().getNextOperator(), (OperatorVertex) o);
        }
        Iterator<OutputWriter> it2 = this.externalMainOutputs.iterator();
        while (it2.hasNext()) {
            emit(it2.next(), (OutputWriter) o);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> void emit(String str, T t) {
        if (this.internalAdditionalOutputs.containsKey(str)) {
            Iterator<NextIntraTaskOperatorInfo> it = this.internalAdditionalOutputs.get(str).iterator();
            while (it.hasNext()) {
                emit(it.next().getNextOperator(), (OperatorVertex) t);
            }
        }
        if (this.externalAdditionalOutputs.containsKey(str)) {
            Iterator<OutputWriter> it2 = this.externalAdditionalOutputs.get(str).iterator();
            while (it2.hasNext()) {
                emit(it2.next(), (OutputWriter) t);
            }
        }
    }

    public void emitWatermark(Watermark watermark) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} emits watermark {}", this.irVertex.getId(), watermark);
        }
        for (NextIntraTaskOperatorInfo nextIntraTaskOperatorInfo : this.internalMainOutputs) {
            nextIntraTaskOperatorInfo.getWatermarkManager().trackAndEmitWatermarks(nextIntraTaskOperatorInfo.getEdgeIndex(), watermark);
        }
        Iterator<List<NextIntraTaskOperatorInfo>> it = this.internalAdditionalOutputs.values().iterator();
        while (it.hasNext()) {
            for (NextIntraTaskOperatorInfo nextIntraTaskOperatorInfo2 : it.next()) {
                nextIntraTaskOperatorInfo2.getWatermarkManager().trackAndEmitWatermarks(nextIntraTaskOperatorInfo2.getEdgeIndex(), watermark);
            }
        }
        Iterator<OutputWriter> it2 = this.externalMainOutputs.iterator();
        while (it2.hasNext()) {
            it2.next().writeWatermark(watermark);
        }
        Iterator<List<OutputWriter>> it3 = this.externalAdditionalOutputs.values().iterator();
        while (it3.hasNext()) {
            Iterator<OutputWriter> it4 = it3.next().iterator();
            while (it4.hasNext()) {
                it4.next().writeWatermark(watermark);
            }
        }
    }
}
