package com.datatorrent.stram.plan.physical;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.math.IntMath;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/plan/physical/StreamMapping.class */
public class StreamMapping implements Serializable {
    private static final long serialVersionUID = 8572852828117485193L;
    private static final Logger LOG;
    private final LogicalPlan.StreamMeta streamMeta;
    private final PhysicalPlan plan;
    PTOperator finalUnifier;
    final Set<PTOperator> cascadingUnifiers = Sets.newHashSet();
    final Set<PTOperator> slidingUnifiers = Sets.newHashSet();
    private final List<PTOperator.PTOutput> upstream = Lists.newArrayList();
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamMapping(LogicalPlan.StreamMeta streamMeta, PhysicalPlan physicalPlan) {
        this.streamMeta = streamMeta;
        this.plan = physicalPlan;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addTo(Collection<PTOperator> collection) {
        if (this.finalUnifier != null) {
            collection.add(this.finalUnifier);
        }
        collection.addAll(this.cascadingUnifiers);
        collection.addAll(this.slidingUnifiers);
    }

    public void setSources(Collection<PTOperator> collection) {
        this.upstream.clear();
        Iterator<PTOperator> it = collection.iterator();
        while (it.hasNext()) {
            for (PTOperator.PTOutput pTOutput : it.next().outputs) {
                if (pTOutput.logicalStream == this.streamMeta) {
                    this.upstream.add(pTOutput);
                }
            }
        }
        redoMapping();
    }

    public static PTOperator createSlidingUnifier(LogicalPlan.StreamMeta streamMeta, PhysicalPlan physicalPlan, int i, int i2) {
        int gcd = IntMath.gcd(i, i2);
        LogicalPlan.OperatorMeta slidingUnifier = streamMeta.getSource().getSlidingUnifier(i / gcd, gcd, i2 / gcd);
        PTOperator newOperator = physicalPlan.newOperator(slidingUnifier, slidingUnifier.getName());
        Operator operator = slidingUnifier.getOperator();
        Operators.PortMappingDescriptor portMappingDescriptor = new Operators.PortMappingDescriptor();
        Operators.describe(operator, portMappingDescriptor);
        if (portMappingDescriptor.outputPorts.size() != 1) {
            throw new AssertionError("Unifier must have a single output port, instead found : " + portMappingDescriptor.outputPorts);
        }
        newOperator.unifiedOperatorMeta = streamMeta.getSource().getOperatorMeta();
        newOperator.outputs.add(new PTOperator.PTOutput(portMappingDescriptor.outputPorts.keySet().iterator().next(), streamMeta, newOperator));
        physicalPlan.newOpers.put(newOperator, operator);
        return newOperator;
    }

    public static PTOperator createUnifier(LogicalPlan.StreamMeta streamMeta, PhysicalPlan physicalPlan) {
        LogicalPlan.OperatorMeta m93getUnifierMeta = streamMeta.getSource().m93getUnifierMeta();
        PTOperator newOperator = physicalPlan.newOperator(m93getUnifierMeta, m93getUnifierMeta.getName());
        Operator operator = m93getUnifierMeta.getOperator();
        Operators.PortMappingDescriptor portMappingDescriptor = new Operators.PortMappingDescriptor();
        Operators.describe(operator, portMappingDescriptor);
        if (portMappingDescriptor.outputPorts.size() != 1) {
            throw new AssertionError("Unifier must have a single output port, instead found : " + portMappingDescriptor.outputPorts);
        }
        newOperator.unifiedOperatorMeta = streamMeta.getSource().getOperatorMeta();
        newOperator.outputs.add(new PTOperator.PTOutput(portMappingDescriptor.outputPorts.keySet().iterator().next(), streamMeta, newOperator));
        physicalPlan.newOpers.put(newOperator, operator);
        return newOperator;
    }

    private void addSlidingUnifiers() {
        LogicalPlan.OperatorMeta operatorMeta = this.streamMeta.getSource().getOperatorMeta();
        if (operatorMeta.getAttributes().contains(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)) {
            if (((Integer) operatorMeta.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)).intValue() >= ((Integer) operatorMeta.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue()) {
                LOG.warn("Sliding Window Count {} should be less than APPLICATION WINDOW COUNT {}", operatorMeta.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT), operatorMeta.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
                return;
            }
            this.plan.undeployOpers.addAll(this.slidingUnifiers);
            this.slidingUnifiers.clear();
            ArrayList newArrayList = Lists.newArrayList();
            for (PTOperator.PTOutput pTOutput : this.upstream) {
                PTOperator createSlidingUnifier = createSlidingUnifier(this.streamMeta, this.plan, ((Integer) operatorMeta.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue(), ((Integer) operatorMeta.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)).intValue());
                addInput(createSlidingUnifier, pTOutput, null);
                this.slidingUnifiers.add(createSlidingUnifier);
                newArrayList.add(createSlidingUnifier.outputs.get(0));
            }
            this.upstream.clear();
            this.upstream.addAll(newArrayList);
        }
    }

    private List<PTOperator.PTOutput> setupCascadingUnifiers(List<PTOperator.PTOutput> list, List<PTOperator> list2, int i, int i2) {
        ArrayList newArrayList = Lists.newArrayList();
        PTOperator pTOperator = null;
        int i3 = 0;
        while (i3 < list.size()) {
            if (i3 % i == 0) {
                if (list.size() - i3 < i) {
                    while (i3 < list.size()) {
                        newArrayList.add(list.get(i3));
                        i3++;
                    }
                    i3++;
                } else {
                    pTOperator = !list2.isEmpty() ? list2.remove(0) : createUnifier(this.streamMeta, this.plan);
                    if (!$assertionsDisabled && pTOperator.outputs.size() != 1) {
                        throw new AssertionError("unifier has single output");
                    }
                    newArrayList.addAll(pTOperator.outputs);
                    this.cascadingUnifiers.add(pTOperator);
                }
            }
            addInput(pTOperator, list.get(i3), null);
            i3++;
        }
        return newArrayList.size() > i ? setupCascadingUnifiers(newArrayList, list2, i, i2) : newArrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void redoMapping() {
        List<PTOperator.PTOutput> list;
        HashSet<Pair> newHashSet = Sets.newHashSet();
        for (LogicalPlan.InputPortMeta inputPortMeta : this.streamMeta.getSinks()) {
            if (!((Boolean) inputPortMeta.getValue(Context.PortContext.PARTITION_PARALLEL)).booleanValue() && this.plan.hasMapping(inputPortMeta.getOperatorWrapper())) {
                Iterator<PTOperator> it = this.plan.getOperators(inputPortMeta.getOperatorWrapper()).iterator();
                while (it.hasNext()) {
                    newHashSet.add(new Pair(it.next(), inputPortMeta));
                }
            }
        }
        if (newHashSet.isEmpty()) {
            return;
        }
        Iterator<PTOperator> it2 = this.cascadingUnifiers.iterator();
        while (it2.hasNext()) {
            detachUnifier(it2.next());
        }
        if (this.finalUnifier != null) {
            detachUnifier(this.finalUnifier);
        }
        ArrayList newArrayList = Lists.newArrayList(this.cascadingUnifiers);
        this.cascadingUnifiers.clear();
        this.plan.undeployOpers.addAll(newArrayList);
        addSlidingUnifiers();
        int intValue = ((Integer) this.streamMeta.getSource().getValue(Context.PortContext.UNIFIER_LIMIT)).intValue();
        boolean z = false;
        Object obj = null;
        Iterator<LogicalPlan.InputPortMeta> it3 = this.streamMeta.getSinks().iterator();
        while (true) {
            if (!it3.hasNext()) {
                break;
            }
            Integer streamCodecIdentifier = this.plan.getStreamCodecIdentifier(StreamingContainerAgent.getStreamCodec(it3.next()));
            if (obj != null) {
                if (!streamCodecIdentifier.equals(obj)) {
                    z = true;
                    break;
                }
            } else {
                obj = streamCodecIdentifier;
            }
        }
        List<PTOperator.PTOutput> list2 = this.upstream;
        HashMap newHashMap = Maps.newHashMap();
        if (intValue > 1 && this.upstream.size() > intValue) {
            if (z) {
                Iterator<LogicalPlan.InputPortMeta> it4 = this.streamMeta.getSinks().iterator();
                while (it4.hasNext()) {
                    StreamCodec<?> streamCodec = StreamingContainerAgent.getStreamCodec(it4.next());
                    if (!newHashMap.containsKey(streamCodec)) {
                        list2 = setupCascadingUnifiers(this.upstream, newArrayList, intValue, 0);
                        newHashMap.put(streamCodec, list2);
                    }
                }
            } else {
                list2 = setupCascadingUnifiers(this.upstream, newArrayList, intValue, 0);
            }
        }
        Iterator<PTOperator> it5 = newArrayList.iterator();
        while (it5.hasNext()) {
            this.plan.removePTOperator(it5.next());
        }
        Boolean bool = (Boolean) this.streamMeta.getSource().getAttributes().get(Context.PortContext.UNIFIER_SINGLE_FINAL);
        for (Pair pair : newHashSet) {
            Map<LogicalPlan.InputPortMeta, Partitioner.PartitionKeys> map = ((PTOperator) pair.first).partitionKeys;
            Partitioner.PartitionKeys partitionKeys = map != null ? map.get(pair.second) : null;
            Boolean bool2 = (Boolean) ((LogicalPlan.InputPortMeta) pair.second).getAttributes().get(Context.PortContext.UNIFIER_SINGLE_FINAL);
            boolean booleanValue = bool2 != null ? bool2.booleanValue() : bool != null ? bool.booleanValue() : ((Boolean) Context.PortContext.UNIFIER_SINGLE_FINAL.defaultValue).booleanValue();
            if (this.upstream.size() <= 1) {
                PTOperator remove = ((PTOperator) pair.first).upstreamMerge.remove(pair.second);
                if (remove != null) {
                    this.plan.removePTOperator(remove);
                }
                setInput((PTOperator) pair.first, (LogicalPlan.InputPortMeta) pair.second, this.upstream.get(0).source, partitionKeys);
            } else if (z || !(partitionKeys == null || partitionKeys.mask == 0 || booleanValue)) {
                LOG.debug("MxN unifier for {} {} {}", new Object[]{pair.first, ((LogicalPlan.InputPortMeta) pair.second).getPortName(), partitionKeys});
                PTOperator pTOperator = ((PTOperator) pair.first).upstreamMerge.get(pair.second);
                if (pTOperator == null) {
                    pTOperator = createUnifier(this.streamMeta, this.plan);
                    ((PTOperator) pair.first).upstreamMerge.put(pair.second, pTOperator);
                    setInput((PTOperator) pair.first, (LogicalPlan.InputPortMeta) pair.second, pTOperator, null);
                }
                for (PTOperator.PTInput pTInput : pTOperator.inputs) {
                    pTInput.source.sinks.remove(pTInput);
                }
                pTOperator.inputs.clear();
                List<PTOperator.PTOutput> list3 = list2;
                if (z && (list = (List) newHashMap.get(StreamingContainerAgent.getStreamCodec((LogicalPlan.InputPortMeta) pair.second))) != null) {
                    list3 = list;
                }
                Iterator<PTOperator.PTOutput> it6 = list3.iterator();
                while (it6.hasNext()) {
                    addInput(pTOperator, it6.next(), partitionKeys);
                }
            } else {
                if (this.finalUnifier == null) {
                    this.finalUnifier = createUnifier(this.streamMeta, this.plan);
                }
                setInput((PTOperator) pair.first, (LogicalPlan.InputPortMeta) pair.second, this.finalUnifier, (partitionKeys == null || partitionKeys.mask == 0) ? null : partitionKeys);
                if (this.finalUnifier.inputs.isEmpty()) {
                    Iterator<PTOperator.PTOutput> it7 = list2.iterator();
                    while (it7.hasNext()) {
                        addInput(this.finalUnifier, it7.next(), null);
                    }
                }
            }
        }
        if (this.finalUnifier == null || !this.finalUnifier.inputs.isEmpty()) {
            return;
        }
        this.plan.removePTOperator(this.finalUnifier);
        this.finalUnifier = null;
    }

    private void setInput(PTOperator pTOperator, LogicalPlan.InputPortMeta inputPortMeta, PTOperator pTOperator2, Partitioner.PartitionKeys partitionKeys) {
        for (PTOperator.PTInput pTInput : pTOperator.inputs) {
            if (pTInput.source.source == pTOperator2 && pTInput.logicalStream == this.streamMeta && inputPortMeta.getPortName().equals(pTInput.portName)) {
                return;
            }
        }
        for (PTOperator.PTOutput pTOutput : pTOperator2.outputs) {
            if (pTOutput.logicalStream == this.streamMeta) {
                pTOperator.inputs.add(new PTOperator.PTInput(inputPortMeta.getPortName(), this.streamMeta, pTOperator, partitionKeys, pTOutput));
            }
        }
    }

    public static void addInput(PTOperator pTOperator, PTOperator.PTOutput pTOutput, Partitioner.PartitionKeys partitionKeys) {
        LogicalPlan.StreamMeta streamMeta = pTOutput.logicalStream;
        pTOperator.inputs.add(new PTOperator.PTInput("<merge#" + streamMeta.getSource().getPortName() + ">", streamMeta, pTOperator, partitionKeys, pTOutput));
    }

    private void detachUnifier(PTOperator pTOperator) {
        for (PTOperator.PTOutput pTOutput : pTOperator.outputs) {
            for (PTOperator.PTInput pTInput : pTOutput.sinks) {
                pTInput.target.inputs.remove(pTInput);
            }
            pTOutput.sinks.clear();
        }
        for (PTOperator.PTInput pTInput2 : pTOperator.inputs) {
            pTInput2.source.sinks.remove(pTInput2);
        }
        pTOperator.inputs.clear();
    }

    static {
        $assertionsDisabled = !StreamMapping.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(StreamMapping.class);
    }
}
