package com.datatorrent.stram.plan.physical;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.Min;
import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/plan/physical/PhysicalPlanTest.class */
public class PhysicalPlanTest {

    /* loaded from: input_file:com/datatorrent/stram/plan/physical/PhysicalPlanTest$PartitionLoadWatch.class */
    public static class PartitionLoadWatch implements StatsListener, Serializable {
        private static final Logger logger = LoggerFactory.getLogger(PartitionLoadWatch.class);
        private static final long serialVersionUID = 201312231633L;
        public long evalIntervalMillis;
        private final long tpsMin;
        private final long tpsMax;
        private long lastEvalMillis;
        private long lastTps;

        private PartitionLoadWatch(long j, long j2) {
            this.evalIntervalMillis = StramTestSupport.DEFAULT_TIMEOUT_MILLIS;
            this.lastTps = 0L;
            this.tpsMin = j;
            this.tpsMax = j2;
        }

        protected PhysicalPlan.LoadIndicator getLoadIndicator(int i, long j) {
            if ((j >= this.tpsMin || this.lastTps == 0) && j <= this.tpsMax) {
                this.lastTps = j;
                return new PhysicalPlan.LoadIndicator(0, (String) null);
            }
            this.lastTps = j;
            return j < this.tpsMin ? new PhysicalPlan.LoadIndicator(-1, String.format("Tuples per second %d is less than the minimum %d", Long.valueOf(j), Long.valueOf(this.tpsMin))) : new PhysicalPlan.LoadIndicator(1, String.format("Tuples per second %d is greater than the maximum %d", Long.valueOf(j), Long.valueOf(this.tpsMax)));
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            long tuplesProcessedPSMA = batchedOperatorStats.getTuplesProcessedPSMA();
            if (tuplesProcessedPSMA == 0) {
                tuplesProcessedPSMA = batchedOperatorStats.getTuplesEmittedPSMA();
            }
            StatsListener.Response response = new StatsListener.Response();
            PhysicalPlan.LoadIndicator loadIndicator = getLoadIndicator(batchedOperatorStats.getOperatorId(), tuplesProcessedPSMA);
            response.loadIndicator = loadIndicator.indicator;
            if (response.loadIndicator != 0 && this.lastEvalMillis < System.currentTimeMillis() - this.evalIntervalMillis) {
                this.lastEvalMillis = System.currentTimeMillis();
                logger.debug("Requesting repartitioning {} {}", Integer.valueOf(response.loadIndicator), Long.valueOf(tuplesProcessedPSMA));
                response.repartitionRequired = true;
                response.repartitionNote = loadIndicator.note;
            }
            return response;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/physical/PhysicalPlanTest$PartitioningTestOperator.class */
    public static class PartitioningTestOperator extends GenericTestOperator implements Partitioner<PartitioningTestOperator> {
        static final String INPORT_WITH_CODEC = "inportWithCodec";
        public String pks;
        public transient Map<Integer, Partitioner.Partition<PartitioningTestOperator>> partitions;
        public Integer[] partitionKeys = {0, 1, 2};
        public boolean fixedCapacity = true;

        @Min(1)
        private int partitionCount = 1;

        @InputPortFieldAnnotation(optional = true)
        public final transient Operator.InputPort<Object> inportWithCodec = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.physical.PhysicalPlanTest.PartitioningTestOperator.1
            public StreamCodec<Object> getStreamCodec() {
                return new PartitioningTestStreamCodec();
            }

            public final void process(Object obj) {
            }
        };

        public void setPartitionCount(int i) {
            this.partitionCount = i;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        public Collection<Partitioner.Partition<PartitioningTestOperator>> definePartitions(Collection<Partitioner.Partition<PartitioningTestOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
            int requiredPartitionCount = DefaultPartition.getRequiredPartitionCount(partitioningContext, this.partitionCount);
            if (!this.fixedCapacity) {
                this.partitionKeys = new Integer[requiredPartitionCount];
                for (int i = 0; i < this.partitionKeys.length; i++) {
                    this.partitionKeys[i] = Integer.valueOf(i);
                }
            }
            ArrayList arrayList = new ArrayList(this.partitionKeys.length);
            for (Integer num : this.partitionKeys) {
                PartitioningTestOperator partitioningTestOperator = new PartitioningTestOperator();
                partitioningTestOperator.setPartitionCount(requiredPartitionCount);
                DefaultPartition defaultPartition = new DefaultPartition(partitioningTestOperator);
                Partitioner.PartitionKeys partitionKeys = new Partitioner.PartitionKeys(2, Sets.newHashSet(new Integer[]{num}));
                defaultPartition.getPartitionKeys().put(this.inport1, partitionKeys);
                defaultPartition.getPartitionKeys().put(this.inportWithCodec, partitionKeys);
                ((PartitioningTestOperator) defaultPartition.getPartitionedInstance()).pks = defaultPartition.getPartitionKeys().values().toString();
                arrayList.add(defaultPartition);
            }
            return arrayList;
        }

        public void partitioned(Map<Integer, Partitioner.Partition<PartitioningTestOperator>> map) {
            this.partitions = map;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/physical/PhysicalPlanTest$PartitioningTestStreamCodec.class */
    private static class PartitioningTestStreamCodec extends DefaultStatefulStreamCodec<Object> implements Serializable {
        private static final long serialVersionUID = 201410301656L;

        private PartitioningTestStreamCodec() {
        }

        public int getPartition(Object obj) {
            return 0;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/physical/PhysicalPlanTest$TestAugmentingPartitioner.class */
    private class TestAugmentingPartitioner<T> implements Partitioner<T> {
        int initalPartitionCount;

        private TestAugmentingPartitioner(int i) {
            this.initalPartitionCount = 1;
            this.initalPartitionCount = i;
        }

        public Collection<Partitioner.Partition<T>> definePartitions(Collection<Partitioner.Partition<T>> collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList newArrayList = Lists.newArrayList(collection);
            int size = collection.size();
            Partitioner.Partition<T> next = collection.iterator().next();
            if (next.getStats() == null) {
                size = this.initalPartitionCount;
            } else {
                Iterator<Partitioner.Partition<T>> it = collection.iterator();
                while (it.hasNext()) {
                    size += it.next().getLoad();
                }
            }
            Object partitionedInstance = next.getPartitionedInstance();
            for (int size2 = collection.size(); size2 < size; size2++) {
                newArrayList.add(new DefaultPartition(partitionedInstance));
            }
            return newArrayList;
        }

        public void partitioned(Map<Integer, Partitioner.Partition<T>> map) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/physical/PhysicalPlanTest$TestPartitioner.class */
    private class TestPartitioner<T extends Operator> extends StatelessPartitioner<T> {
        private static final long serialVersionUID = 1;

        private TestPartitioner() {
        }

        public Collection<Partitioner.Partition<T>> definePartitions(Collection<Partitioner.Partition<T>> collection, Partitioner.PartitioningContext partitioningContext) {
            Collection<Partitioner.Partition<T>> definePartitions = super.definePartitions(collection, partitioningContext);
            if (partitioningContext.getParallelPartitionCount() > 0 && definePartitions.size() < partitioningContext.getParallelPartitionCount()) {
                for (int size = definePartitions.size(); size < partitioningContext.getParallelPartitionCount(); size++) {
                    definePartitions.add(new DefaultPartition(collection.iterator().next().getPartitionedInstance()));
                }
            }
            return definePartitions;
        }
    }

    @Test
    public void testStaticPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("node0", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("node1", GenericTestOperator.class);
        PartitioningTestOperator addOperator3 = logicalPlan.addOperator("partitioned", PartitioningTestOperator.class);
        addOperator3.setPartitionCount(addOperator3.partitionKeys.length);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("singleton1", GenericTestOperator.class);
        GenericTestOperator addOperator5 = logicalPlan.addOperator("singleton2", GenericTestOperator.class);
        logicalPlan.addStream("n0.inport1", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("n1.outport1", addOperator2.outport1, addOperator3.inport1, addOperator3.inportWithCodec);
        logicalPlan.addStream("mergeStream", addOperator3.outport1, addOperator4.inport1, addOperator5.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator3);
        logicalPlan.validate();
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 2L, physicalPlan.getContainers().size());
        Assert.assertNotNull("partition map", addOperator3.partitions);
        Assert.assertEquals("partition map " + addOperator3.partitions, 3L, addOperator3.partitions.size());
        List operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("partition instances " + operators, addOperator3.partitionKeys.length, operators.size());
        for (int i = 0; i < operators.size(); i++) {
            PTOperator pTOperator = (PTOperator) operators.get(i);
            HashMap hashMap = new HashMap();
            for (PTOperator.PTInput pTInput : pTOperator.getInputs()) {
                hashMap.put(pTInput.portName, pTInput);
                Assert.assertEquals("partitions " + pTInput, Sets.newHashSet(new Integer[]{addOperator3.partitionKeys[i]}), pTInput.partitions.partitions);
            }
            Assert.assertEquals("number inputs " + hashMap, Sets.newHashSet(new String[]{GenericTestOperator.IPORT1, "inportWithCodec"}), hashMap.keySet());
        }
        List mergeOperators = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("number unifiers " + meta, 1L, mergeOperators.size());
        PTOperator pTOperator2 = (PTOperator) mergeOperators.iterator().next();
        Assert.assertNotNull("unifier container " + pTOperator2, pTOperator2.getContainer());
        Assert.assertEquals("unifier inputs " + pTOperator2, addOperator3.partitionKeys.length, pTOperator2.inputs.size());
    }

    @Test
    public void testDefaultPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("node2", GenericTestOperator.class);
        logicalPlan.addStream("node1.outport1", addOperator.outport1, addOperator2.inport2, addOperator2.inport1);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator2);
        meta.getAttributes().put(Context.OperatorContext.PARTITIONER, new StatelessPartitioner(5));
        List operators = new PhysicalPlan(logicalPlan, new TestPlanContext()).getOperators(meta);
        Assert.assertEquals("partition instances " + operators, 5, operators.size());
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < operators.size(); i++) {
            PTOperator pTOperator = (PTOperator) operators.get(i);
            Assert.assertNotNull("partition keys null: " + pTOperator, pTOperator.getPartitionKeys());
            Map partitionKeys = pTOperator.getPartitionKeys();
            Assert.assertEquals("partition keys size: " + partitionKeys, 1L, partitionKeys.size());
            Operator.InputPort<Object> inputPort = addOperator2.inport2;
            Assert.assertEquals("partition port: " + partitionKeys, inputPort, partitionKeys.keySet().iterator().next());
            Assert.assertEquals("partition mask: " + partitionKeys, "111", Integer.toBinaryString(((Partitioner.PartitionKeys) partitionKeys.get(inputPort)).mask));
            Set set = ((Partitioner.PartitionKeys) partitionKeys.get(inputPort)).partitions;
            Assert.assertTrue("number partition keys: " + partitionKeys, set.size() == 1 || set.size() == 2);
            newArrayList.addAll(set);
        }
        int parseInt = Integer.parseInt("111", 2);
        Assert.assertEquals("assigned partitions ", parseInt + 1, newArrayList.size());
        for (int i2 = 0; i2 <= parseInt; i2++) {
            Assert.assertTrue("" + newArrayList, newArrayList.contains(Integer.valueOf(i2)));
        }
    }

    @Test
    public void testNumberOfUnifiers() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("node1", GenericTestOperator.class);
        logicalPlan.addStream("node1.outport1", addOperator.outport1, logicalPlan.addOperator("node2", GenericTestOperator.class).inport1);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(5));
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_LIMIT, 3);
        int i = 0;
        int i2 = 0;
        Iterator it = new PhysicalPlan(logicalPlan, new TestPlanContext()).getContainers().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((PTContainer) it.next()).getOperators().iterator();
            while (it2.hasNext()) {
                i2++;
                if (((PTOperator) it2.next()).isUnifier()) {
                    i++;
                }
            }
        }
        Assert.assertEquals("Number of operators", 8L, i2);
        Assert.assertEquals("Number of unifiers", 2L, i);
    }

    @Test
    public void testNumberOfUnifiersWithEvenPartitions() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("node1", GenericTestOperator.class);
        logicalPlan.addStream("node1.outport1", addOperator.outport1, logicalPlan.addOperator("node2", GenericTestOperator.class).inport1);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(8));
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_LIMIT, 4);
        int i = 0;
        int i2 = 0;
        Iterator it = new PhysicalPlan(logicalPlan, new TestPlanContext()).getContainers().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((PTContainer) it.next()).getOperators().iterator();
            while (it2.hasNext()) {
                i2++;
                if (((PTOperator) it2.next()).isUnifier()) {
                    i++;
                }
            }
        }
        Assert.assertEquals("Number of operators", 12L, i2);
        Assert.assertEquals("Number of unifiers", 3L, i);
    }

    @Test
    public void testRepartitioningScaleUp() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("mergeNode", GenericTestOperator.class);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1, addOperator2.inport2);
        logicalPlan.addStream("mergeStream", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator2);
        meta.getAttributes().put(Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitionLoadWatch(0L, 5L)}));
        meta.getAttributes().put(Context.OperatorContext.PARTITIONER, new StatelessPartitioner(1));
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 2L, physicalPlan.getContainers().size());
        Assert.assertEquals("number of operators", 3L, physicalPlan.getAllOperators().size());
        Assert.assertEquals("number of save requests", 3L, testPlanContext.backupRequests);
        List operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("number operators " + meta, 1L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        Assert.assertEquals("stats handlers " + pTOperator, 1L, pTOperator.statsListeners.size());
        StatsListener statsListener = (StatsListener) pTOperator.statsListeners.get(0);
        Assert.assertTrue("stats handlers " + pTOperator.statsListeners, statsListener instanceof PartitionLoadWatch);
        ((PartitionLoadWatch) statsListener).evalIntervalMillis = -1L;
        setThroughput(pTOperator, 10L);
        physicalPlan.onStatusUpdate(pTOperator);
        Assert.assertEquals("load exceeds max", 1L, testPlanContext.events.size());
        testPlanContext.backupRequests = 0;
        testPlanContext.events.remove(0).run();
        List operators2 = physicalPlan.getOperators(meta);
        Assert.assertEquals("partition instances " + operators2, 2L, operators2.size());
        PTOperator pTOperator2 = (PTOperator) operators2.get(0);
        PTOperator pTOperator3 = (PTOperator) operators2.get(1);
        HashSet newHashSet = Sets.newHashSet(physicalPlan.getOperators(logicalPlan.getMeta(addOperator3)));
        newHashSet.add(pTOperator2);
        newHashSet.addAll(physicalPlan.getMergeOperators(meta));
        setThroughput(pTOperator2, 0L);
        physicalPlan.onStatusUpdate(pTOperator2);
        Assert.assertEquals("load min", 0L, testPlanContext.events.size());
        setThroughput(pTOperator2, 3L);
        physicalPlan.onStatusUpdate(pTOperator2);
        Assert.assertEquals("load within range", 0L, testPlanContext.events.size());
        setThroughput(pTOperator2, 10L);
        physicalPlan.onStatusUpdate(pTOperator2);
        Assert.assertEquals("load exceeds max", 1L, testPlanContext.events.size());
        testPlanContext.backupRequests = 0;
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("new partitions", 3L, physicalPlan.getOperators(meta).size());
        Assert.assertTrue("", physicalPlan.getOperators(meta).contains(pTOperator3));
        for (PTOperator pTOperator4 : physicalPlan.getOperators(meta)) {
            Assert.assertNotNull("container null " + pTOperator4, pTOperator4.getContainer());
            Assert.assertEquals("outputs " + pTOperator4, 1L, pTOperator4.getOutputs().size());
            Assert.assertEquals("downstream operators " + ((PTOperator.PTOutput) pTOperator4.getOutputs().get(0)).sinks, 1L, ((PTOperator.PTOutput) pTOperator4.getOutputs().get(0)).sinks.size());
        }
        Assert.assertEquals("" + testPlanContext.undeploy, newHashSet, testPlanContext.undeploy);
        HashSet newHashSet2 = Sets.newHashSet(physicalPlan.getOperators(logicalPlan.getMeta(addOperator3)));
        newHashSet2.addAll(physicalPlan.getOperators(meta));
        newHashSet2.remove(pTOperator3);
        newHashSet2.addAll(physicalPlan.getMergeOperators(meta));
        Assert.assertEquals("" + testPlanContext.deploy, newHashSet2, testPlanContext.deploy);
        Assert.assertEquals("Count of storage requests", 2L, testPlanContext.backupRequests);
        PTOperator pTOperator5 = (PTOperator) physicalPlan.getOperators(meta).get(0);
        physicalPlan.setAvailableResources(0);
        setThroughput(pTOperator5, 10L);
        physicalPlan.onStatusUpdate(pTOperator5);
        Assert.assertEquals("not repartitioned", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("partition count unchanged", 3L, physicalPlan.getOperators(meta).size());
    }

    @Test
    public void testInputOperatorPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        PartitioningTest.TestInputOperator addOperator = logicalPlan.addOperator("o1", new PartitioningTest.TestInputOperator());
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 2L, physicalPlan.getContainers().size());
        List operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("partition instances " + operators, 2L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        Assert.assertEquals("stats handlers " + pTOperator, 1L, pTOperator.statsListeners.size());
        Assert.assertTrue("stats handlers " + pTOperator.statsListeners, ((StatsListener) pTOperator.statsListeners.get(0)) instanceof PartitioningTest.PartitionLoadWatch);
        PartitioningTest.PartitionLoadWatch.put(pTOperator, 1);
        physicalPlan.onStatusUpdate(pTOperator);
        Assert.assertEquals("scale up triggered", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("operators after scale up", 3L, physicalPlan.getOperators(meta).size());
        for (PTOperator pTOperator2 : physicalPlan.getOperators(meta)) {
            Assert.assertEquals("activation window id " + pTOperator2, Checkpoint.INITIAL_CHECKPOINT, pTOperator2.recoveryCheckpoint);
            Assert.assertEquals("checkpoints " + pTOperator2 + " " + pTOperator2.checkpoints, Lists.newArrayList(), pTOperator2.checkpoints);
            PartitioningTest.PartitionLoadWatch.put(pTOperator2, -1);
            physicalPlan.onStatusUpdate(pTOperator2);
        }
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("operators after scale down", 2L, physicalPlan.getOperators(meta).size());
    }

    @Test
    public void testRepartitioningScaleDown() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3parallel", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator3);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("mergeNode", GenericTestOperator.class);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1, addOperator2.inport2);
        logicalPlan.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setInputPortAttribute(addOperator3.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.addStream("o3parallel_outport1", addOperator3.outport1, addOperator4.inport1);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        meta2.getAttributes().put(Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitionLoadWatch(3L, 5L)}));
        meta2.getAttributes().put(Context.OperatorContext.PARTITIONER, new StatelessPartitioner(8));
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 2L, physicalPlan.getContainers().size());
        Assert.assertEquals("Count of storage requests", physicalPlan.getAllOperators().size(), testPlanContext.backupRequests);
        List<PTOperator> operators = physicalPlan.getOperators(meta2);
        Assert.assertEquals("partition instances " + operators, 8L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        Assert.assertEquals("unifiers " + meta2, 0L, physicalPlan.getMergeOperators(meta2).size());
        List mergeOperators = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("unifiers " + meta, 1L, mergeOperators.size());
        PTOperator pTOperator2 = (PTOperator) mergeOperators.iterator().next();
        Assert.assertEquals("unifier inputs " + pTOperator2, 8L, pTOperator2.getInputs().size());
        HashSet newHashSet = Sets.newHashSet(physicalPlan.getOperators(logicalPlan.getMeta(addOperator4)));
        newHashSet.addAll(operators);
        newHashSet.addAll(physicalPlan.getOperators(meta));
        newHashSet.addAll(physicalPlan.getMergeOperators(meta));
        Assert.assertEquals("stats handlers " + pTOperator, 1L, pTOperator.statsListeners.size());
        StatsListener statsListener = (StatsListener) pTOperator.statsListeners.get(0);
        Assert.assertTrue("stats handlers " + pTOperator.statsListeners, statsListener instanceof PartitionLoadWatch);
        ((PartitionLoadWatch) statsListener).evalIntervalMillis = -1L;
        setThroughput(pTOperator, 5L);
        physicalPlan.onStatusUpdate(pTOperator);
        Assert.assertEquals("load upper bound", 0L, testPlanContext.events.size());
        setThroughput(pTOperator, 3L);
        physicalPlan.onStatusUpdate(pTOperator);
        Assert.assertEquals("load lower bound", 0L, testPlanContext.events.size());
        setThroughput(pTOperator, 2L);
        physicalPlan.onStatusUpdate(pTOperator);
        Assert.assertEquals("load below min", 1L, testPlanContext.events.size());
        testPlanContext.backupRequests = 0;
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("partitions unchanged", Sets.newHashSet(operators), Sets.newHashSet(physicalPlan.getOperators(meta2)));
        for (PTOperator pTOperator3 : operators) {
            setThroughput(pTOperator3, 2L);
            physicalPlan.onStatusUpdate(pTOperator3);
        }
        Assert.assertEquals("load below min", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("partitions merged", 4L, physicalPlan.getOperators(meta2).size());
        Assert.assertEquals("unifier inputs after scale down " + pTOperator2, 4L, pTOperator2.getInputs().size());
        Iterator it = physicalPlan.getOperators(meta).iterator();
        while (it.hasNext()) {
            Assert.assertEquals("outputs " + ((PTOperator) it.next()).getOutputs(), 1L, r0.getOutputs().size());
        }
        for (PTOperator pTOperator4 : physicalPlan.getOperators(meta2)) {
            Partitioner.PartitionKeys partitionKeys = (Partitioner.PartitionKeys) pTOperator4.getPartitionKeys().values().iterator().next();
            Assert.assertEquals("partition mask " + pTOperator4, 3L, partitionKeys.mask);
            Assert.assertEquals("inputs " + pTOperator4, 2L, pTOperator4.getInputs().size());
            boolean z = false;
            for (PTOperator.PTInput pTInput : pTOperator4.getInputs()) {
                if (GenericTestOperator.IPORT1.equals(pTInput.portName)) {
                    z = true;
                    Assert.assertEquals("partition mask " + pTInput, partitionKeys, pTInput.partitions);
                }
            }
            Assert.assertTrue("connected inport1", z);
        }
        Assert.assertEquals("" + testPlanContext.undeploy, newHashSet, testPlanContext.undeploy);
        HashSet newHashSet2 = Sets.newHashSet(physicalPlan.getOperators(logicalPlan.getMeta(addOperator4)));
        newHashSet2.addAll(physicalPlan.getOperators(meta2));
        newHashSet2.addAll(physicalPlan.getOperators(meta));
        newHashSet2.addAll(physicalPlan.getMergeOperators(meta));
        Assert.assertEquals("" + testPlanContext.deploy, newHashSet2, testPlanContext.deploy);
        for (PTOperator pTOperator5 : testPlanContext.deploy) {
            Assert.assertNotNull("container " + pTOperator5, pTOperator5.getContainer());
        }
        Assert.assertEquals("Count of storage requests", 8L, testPlanContext.backupRequests);
    }

    @Test
    public void testRepartitioningScaleDownSinglePartition() {
        LogicalPlan logicalPlan = new LogicalPlan();
        PartitioningTest.TestInputOperator addOperator = logicalPlan.addOperator("o1", PartitioningTest.TestInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.addStream("o1.outport1", addOperator.output, addOperator2.inport1);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        List operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("partitions " + operators, 2L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        PTOperator pTOperator2 = ((PTOperator.PTInput) ((PTOperator.PTOutput) pTOperator.getOutputs().get(0)).sinks.get(0)).target;
        Assert.assertSame("", pTOperator2.getOperatorMeta(), meta.getMeta(addOperator.output).getUnifierMeta());
        Assert.assertTrue("unifier ", pTOperator2.isUnifier());
        List mergeOperators = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("unifiers " + meta, 1L, mergeOperators.size());
        Assert.assertTrue("stats handlers " + pTOperator.statsListeners, ((StatsListener) pTOperator.statsListeners.get(0)) instanceof PartitioningTest.PartitionLoadWatch);
        PartitioningTest.PartitionLoadWatch.put(pTOperator, -1);
        PartitioningTest.PartitionLoadWatch.put((PTOperator) operators.get(1), -1);
        physicalPlan.onStatusUpdate(pTOperator);
        physicalPlan.onStatusUpdate((PTOperator) operators.get(1));
        Assert.assertEquals("partition scaling triggered", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        List operators2 = physicalPlan.getOperators(meta);
        Assert.assertEquals("partitions " + operators2, 1L, operators2.size());
        Assert.assertEquals("unifiers " + meta, 0L, physicalPlan.getMergeOperators(meta).size());
        PTOperator pTOperator3 = ((PTOperator.PTInput) ((PTOperator.PTOutput) pTOperator.getOutputs().get(0)).sinks.get(0)).target;
        Assert.assertTrue("", pTOperator3.getOperatorMeta() == logicalPlan.getMeta(addOperator2));
        Assert.assertFalse("unifier ", pTOperator3.isUnifier());
        Assert.assertTrue("removed unifier from deployment " + testPlanContext.undeploy, testPlanContext.undeploy.containsAll(mergeOperators));
        Assert.assertFalse("removed unifier from deployment " + testPlanContext.deploy, testPlanContext.deploy.containsAll(mergeOperators));
        setActivationCheckpoint((PTOperator) operators2.get(0), 3L);
        PartitioningTest.PartitionLoadWatch.put((PTOperator) operators2.get(0), 1);
        physicalPlan.onStatusUpdate((PTOperator) operators2.get(0));
        Assert.assertEquals("partition scaling triggered", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        List mergeOperators2 = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("unifiers " + meta, 1L, mergeOperators2.size());
        Assert.assertEquals("unifier activation checkpoint " + meta, 3L, ((PTOperator) mergeOperators2.get(0)).recoveryCheckpoint.windowId);
    }

    private void setActivationCheckpoint(PTOperator pTOperator, long j) {
        try {
            ((StorageAgent) pTOperator.operatorMeta.getValue(Context.OperatorContext.STORAGE_AGENT)).save(pTOperator.operatorMeta.getOperator(), pTOperator.id, j);
            pTOperator.setRecoveryCheckpoint(new Checkpoint(3L, 0, 0));
        } catch (Exception e) {
            Assert.fail(e.toString());
        }
    }

    @Test
    public void testDefaultRepartitioning() {
        List<Partitioner.PartitionKeys> asList = Arrays.asList(newPartitionKeys("11", "00"), newPartitionKeys("11", "10"), newPartitionKeys("11", "01"), newPartitionKeys("11", "11"));
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        HashSet<Partitioner.PartitionKeys> newHashSet = Sets.newHashSet(new Partitioner.PartitionKeys[]{newPartitionKeys("1", "0"), newPartitionKeys("1", "1")});
        ArrayList arrayList = new ArrayList();
        for (Partitioner.PartitionKeys partitionKeys : newHashSet) {
            HashMap hashMap = new HashMap();
            hashMap.put(genericTestOperator.inport1, partitionKeys);
            arrayList.add(new DefaultPartition(genericTestOperator, hashMap, 1, (StatsListener.BatchedOperatorStats) null));
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Partitioner.Partition partition = (Partitioner.Partition) it.next();
            arrayList2.add(new DefaultPartition(partition.getPartitionedInstance(), partition.getPartitionKeys(), -1, (StatsListener.BatchedOperatorStats) null));
        }
        ArrayList<Partitioner.Partition> newArrayList = Lists.newArrayList();
        newArrayList.addAll(StatelessPartitioner.repartition(arrayList2));
        Assert.assertEquals("" + newArrayList, 1L, newArrayList.size());
        Assert.assertEquals("" + ((Partitioner.Partition) newArrayList.get(0)).getPartitionKeys(), 0L, ((Partitioner.PartitionKeys) ((Partitioner.Partition) newArrayList.get(0)).getPartitionKeys().values().iterator().next()).mask);
        Collection repartition = StatelessPartitioner.repartition(Collections.singletonList(new DefaultPartition(genericTestOperator, ((Partitioner.Partition) newArrayList.get(0)).getPartitionKeys(), -1, (StatsListener.BatchedOperatorStats) null)));
        newArrayList.clear();
        newArrayList.addAll(repartition);
        Assert.assertEquals("" + newArrayList, 1L, newArrayList.size());
        Collection repartition2 = StatelessPartitioner.repartition(Collections.singletonList(new DefaultPartition(genericTestOperator, ((Partitioner.Partition) newArrayList.get(0)).getPartitionKeys(), 1, (StatsListener.BatchedOperatorStats) null)));
        newArrayList.clear();
        newArrayList.addAll(repartition2);
        Assert.assertEquals("" + newArrayList, 2L, newArrayList.size());
        Collection repartition3 = StatelessPartitioner.repartition(arrayList);
        newArrayList.clear();
        newArrayList.addAll(repartition3);
        Assert.assertEquals("" + newArrayList, 4L, newArrayList.size());
        HashSet newHashSet2 = Sets.newHashSet(asList);
        for (Partitioner.Partition partition2 : newArrayList) {
            Assert.assertEquals("" + partition2.getPartitionKeys(), 1L, partition2.getPartitionKeys().size());
            Assert.assertEquals("" + partition2.getPartitionKeys(), genericTestOperator.inport1, partition2.getPartitionKeys().keySet().iterator().next());
            newHashSet2.remove((Partitioner.PartitionKeys) partition2.getPartitionKeys().values().iterator().next());
        }
        Assert.assertTrue("" + newHashSet2, newHashSet2.isEmpty());
        for (Set set : Arrays.asList(Sets.newHashSet(new Partitioner.PartitionKeys[]{newPartitionKeys("11", "00"), newPartitionKeys("11", "10"), newPartitionKeys("1", "1")}), Sets.newHashSet(new Partitioner.PartitionKeys[]{newPartitionKeys("1", "0"), newPartitionKeys("11", "01"), newPartitionKeys("11", "11")}))) {
            ArrayList newArrayList2 = Lists.newArrayList();
            for (Partitioner.PartitionKeys partitionKeys2 : asList) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put(genericTestOperator.inport1, partitionKeys2);
                newArrayList2.add(new DefaultPartition(genericTestOperator, hashMap2, set.contains(partitionKeys2) ? 0 : -1, (StatsListener.BatchedOperatorStats) null));
            }
            Collection repartition4 = StatelessPartitioner.repartition(newArrayList2);
            newArrayList.clear();
            newArrayList.addAll(repartition4);
            Assert.assertEquals("" + newArrayList, 3L, newArrayList.size());
            for (Partitioner.Partition partition3 : newArrayList) {
                Assert.assertEquals("" + partition3.getPartitionKeys(), 1L, partition3.getPartitionKeys().size());
                Assert.assertEquals("" + partition3.getPartitionKeys(), genericTestOperator.inport1, partition3.getPartitionKeys().keySet().iterator().next());
                set.remove((Partitioner.PartitionKeys) partition3.getPartitionKeys().values().iterator().next());
            }
            Assert.assertTrue("" + set, set.isEmpty());
        }
        ArrayList newArrayList3 = Lists.newArrayList();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            newArrayList3.add(new DefaultPartition(genericTestOperator, ((Partitioner.Partition) it2.next()).getPartitionKeys(), -1, (StatsListener.BatchedOperatorStats) null));
        }
        Collection repartition5 = StatelessPartitioner.repartition(newArrayList3);
        newArrayList.clear();
        newArrayList.addAll(repartition5);
        Assert.assertEquals("" + newArrayList, 1L, newArrayList.size());
        for (Partitioner.Partition partition4 : newArrayList) {
            Assert.assertEquals("" + partition4.getPartitionKeys(), 1L, partition4.getPartitionKeys().size());
            Partitioner.PartitionKeys partitionKeys3 = (Partitioner.PartitionKeys) partition4.getPartitionKeys().values().iterator().next();
            Assert.assertEquals("" + partitionKeys3, 0L, partitionKeys3.mask);
            Assert.assertEquals("" + partitionKeys3, Sets.newHashSet(new Integer[]{0}), partitionKeys3.partitions);
        }
    }

    private Partitioner.PartitionKeys newPartitionKeys(String str, String str2) {
        return new Partitioner.PartitionKeys(Integer.parseInt(str, 2), Sets.newHashSet(new Integer[]{Integer.valueOf(Integer.parseInt(str2, 2))}));
    }

    private void setThroughput(PTOperator pTOperator, long j) {
        pTOperator.stats.statsRevs.checkout();
        pTOperator.stats.tuplesProcessedPSMA.set(j);
        pTOperator.stats.statsRevs.commit();
    }

    @Test
    public void testInline() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        PartitioningTestOperator addOperator4 = logicalPlan.addOperator("partNode", PartitioningTestOperator.class);
        addOperator4.partitionKeys = new Integer[]{0, 1};
        logicalPlan.getMeta(addOperator4).getAttributes().put(Context.OperatorContext.PARTITIONER, new StatelessPartitioner(addOperator4.partitionKeys.length));
        logicalPlan.addStream("o1_outport1", addOperator.outport1, new Operator.InputPort[]{addOperator2.inport1, addOperator3.inport1, addOperator4.inport1}).setLocality((DAG.Locality) null);
        logicalPlan.addStream("o2_outport1", addOperator2.outport1, addOperator3.inport2).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.addStream("o3_outport1", addOperator3.outport1, addOperator4.inport2);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 4);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new TestPlanContext());
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 4, physicalPlan.getContainers().size());
        Assert.assertEquals("operators container 0", 1L, ((PTContainer) physicalPlan.getContainers().get(0)).getOperators().size());
        Assert.assertEquals("operators container 0", 1L, ((PTContainer) physicalPlan.getContainers().get(0)).getOperators().size());
        HashSet newHashSet = Sets.newHashSet(new LogicalPlan.OperatorMeta[]{logicalPlan.getMeta(addOperator2), logicalPlan.getMeta(addOperator3)});
        HashSet hashSet = new HashSet();
        PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(1);
        Iterator it = pTContainer.getOperators().iterator();
        while (it.hasNext()) {
            hashSet.add(((PTOperator) it.next()).getOperatorMeta());
        }
        Assert.assertEquals("operators " + pTContainer, newHashSet, hashSet);
        Iterator it2 = physicalPlan.getOperators(logicalPlan.getMeta(addOperator4)).iterator();
        while (it2.hasNext()) {
            Assert.assertEquals("operators container" + ((PTOperator) it2.next()), 1L, r0.getContainer().getOperators().size());
        }
    }

    @Test
    public void testInlineMultipleInputs() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("node3", GenericTestOperator.class);
        logicalPlan.addStream("n1Output1", addOperator.outport1, addOperator3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.addStream("n2Output1", addOperator2.outport1, addOperator3.inport2).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new TestPlanContext());
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 1L, physicalPlan.getContainers().size());
        PTOperator.PTOutput pTOutput = (PTOperator.PTOutput) ((PTOperator) physicalPlan.getOperators(logicalPlan.getMeta(addOperator)).get(0)).getOutputs().get(0);
        Assert.assertTrue("inline " + pTOutput, pTOutput.isDownStreamInline());
        PTOperator.PTOutput pTOutput2 = (PTOperator.PTOutput) ((PTOperator) physicalPlan.getOperators(logicalPlan.getMeta(addOperator2)).get(0)).getOutputs().get(0);
        Assert.assertTrue("inline " + pTOutput2, pTOutput2.isDownStreamInline());
    }

    @Test
    public void testNodeLocality() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        logicalPlan.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        GenericTestOperator addOperator3 = logicalPlan.addOperator("partitionedParallel", GenericTestOperator.class);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1).setLocality((DAG.Locality) null);
        logicalPlan.addStream("partitioned_outport1", addOperator2.outport1, addOperator3.inport2).setLocality(DAG.Locality.NODE_LOCAL);
        logicalPlan.setInputPortAttribute(addOperator3.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.addStream("partitionedParallel_outport1", addOperator3.outport1, logicalPlan.addOperator("single", GenericTestOperator.class).inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 7);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new TestPlanContext());
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 7, physicalPlan.getContainers().size());
        PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(0);
        Assert.assertEquals("number operators " + pTContainer, 1L, pTContainer.getOperators().size());
        Assert.assertEquals("operators " + pTContainer, logicalPlan.getMeta(addOperator), ((PTOperator) pTContainer.getOperators().get(0)).getOperatorMeta());
        for (int i = 1; i < 3; i++) {
            PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(i);
            Assert.assertEquals("number operators " + pTContainer2, 1L, pTContainer2.getOperators().size());
            HashSet newHashSet = Sets.newHashSet(new LogicalPlan.OperatorMeta[]{logicalPlan.getMeta(addOperator2)});
            HashSet newHashSet2 = Sets.newHashSet();
            Iterator it = pTContainer2.getOperators().iterator();
            while (it.hasNext()) {
                newHashSet2.add(((PTOperator) it.next()).getOperatorMeta());
            }
            Assert.assertEquals("operators " + pTContainer2, newHashSet, newHashSet2);
        }
        for (int i2 = 3; i2 < 5; i2++) {
            PTContainer pTContainer3 = (PTContainer) physicalPlan.getContainers().get(i2);
            Assert.assertEquals("number operators " + pTContainer3, 1L, pTContainer3.getOperators().size());
            HashSet newHashSet3 = Sets.newHashSet(new LogicalPlan.OperatorMeta[]{logicalPlan.getMeta(addOperator3)});
            HashSet newHashSet4 = Sets.newHashSet();
            for (PTOperator pTOperator : pTContainer3.getOperators()) {
                newHashSet4.add(pTOperator.getOperatorMeta());
                Assert.assertEquals("nodeLocal " + pTOperator.getNodeLocalOperators(), 2L, pTOperator.getNodeLocalOperators().getOperatorSet().size());
            }
            Assert.assertEquals("operators " + pTContainer3, newHashSet3, newHashSet4);
        }
    }

    @Test
    public void testParallelPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.addStream("o1Output1", addOperator.outport1, addOperator2.inport1, addOperator3.inport1).setLocality((DAG.Locality) null);
        logicalPlan.addStream("o2Output1", addOperator2.outport1, addOperator3.inport2).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setInputPortAttribute(addOperator3.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        PartitioningTestOperator addOperator4 = logicalPlan.addOperator("o3_1", PartitioningTestOperator.class);
        addOperator4.fixedCapacity = false;
        logicalPlan.setInputPortAttribute(addOperator4.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator4);
        GenericTestOperator addOperator5 = logicalPlan.addOperator("o3_2", GenericTestOperator.class);
        logicalPlan.setInputPortAttribute(addOperator5.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator5);
        logicalPlan.addStream("o3outport1", addOperator3.outport1, addOperator4.inport1, addOperator5.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        GenericTestOperator addOperator6 = logicalPlan.addOperator("o4", GenericTestOperator.class);
        logicalPlan.setInputPortAttribute(addOperator6.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setInputPortAttribute(addOperator6.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        LogicalPlan.OperatorMeta meta3 = logicalPlan.getMeta(addOperator6);
        logicalPlan.addStream("o3_1.outport1", addOperator4.outport1, addOperator6.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.addStream("o3_2.outport1", addOperator5.outport1, addOperator6.inport2).setLocality(DAG.Locality.CONTAINER_LOCAL);
        GenericTestOperator addOperator7 = logicalPlan.addOperator("o5single", GenericTestOperator.class);
        logicalPlan.addStream("o4outport1", addOperator6.outport1, addOperator7.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new TestPlanContext());
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 5L, physicalPlan.getContainers().size());
        PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(0);
        Assert.assertEquals("number operators " + pTContainer, 1L, pTContainer.getOperators().size());
        Assert.assertEquals("operators " + pTContainer, "o1", ((PTOperator) pTContainer.getOperators().get(0)).getOperatorMeta().getName());
        for (int i = 1; i < 3; i++) {
            PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(i);
            Assert.assertEquals("number operators " + pTContainer2, 5L, pTContainer2.getOperators().size());
            HashSet newHashSet = Sets.newHashSet(new String[]{"o2", "o3", meta.getName(), meta2.getName(), meta3.getName()});
            HashSet newHashSet2 = Sets.newHashSet();
            Iterator it = pTContainer2.getOperators().iterator();
            while (it.hasNext()) {
                newHashSet2.add(((PTOperator) it.next()).getOperatorMeta().getName());
            }
            Assert.assertEquals("operator names " + pTContainer2, newHashSet, newHashSet2);
        }
        Iterator it2 = Lists.newArrayList(new LogicalPlan.OperatorMeta[]{logicalPlan.getMeta(addOperator2), meta, meta2}).iterator();
        while (it2.hasNext()) {
            List<PTOperator> operators = physicalPlan.getOperators((LogicalPlan.OperatorMeta) it2.next());
            Assert.assertEquals("" + operators, 2L, operators.size());
            for (PTOperator pTOperator : operators) {
                Assert.assertEquals("outputs " + pTOperator, 1L, pTOperator.getOutputs().size());
                Assert.assertTrue("downstream inline " + pTOperator.getOutputs().get(0), ((PTOperator.PTOutput) pTOperator.getOutputs().get(0)).isDownStreamInline());
            }
        }
        Assert.assertEquals("unifier " + meta3 + ": " + physicalPlan.getMergeOperators(meta3), 1L, r0.size());
        PTContainer pTContainer3 = (PTContainer) physicalPlan.getContainers().get(3);
        Assert.assertEquals("number operators " + pTContainer3, 1L, pTContainer3.getOperators().size());
        Assert.assertEquals("operators " + pTContainer3, meta3.getMeta(addOperator6.outport1).getUnifierMeta(), ((PTOperator) pTContainer3.getOperators().get(0)).getOperatorMeta());
        Assert.assertTrue("unifier " + addOperator6, ((PTOperator) pTContainer3.getOperators().get(0)).isUnifier());
        Assert.assertEquals("unifier inputs" + ((PTOperator) pTContainer3.getOperators().get(0)).getInputs(), 2L, ((PTOperator) pTContainer3.getOperators().get(0)).getInputs().size());
        Assert.assertEquals("unifier outputs" + ((PTOperator) pTContainer3.getOperators().get(0)).getOutputs(), 1L, ((PTOperator) pTContainer3.getOperators().get(0)).getOutputs().size());
        LogicalPlan.OperatorMeta meta4 = logicalPlan.getMeta(addOperator7);
        PTContainer pTContainer4 = (PTContainer) physicalPlan.getContainers().get(4);
        Assert.assertEquals("number operators " + pTContainer4, 1L, pTContainer4.getOperators().size());
        Assert.assertEquals("operators " + pTContainer4, meta4, ((PTOperator) pTContainer4.getOperators().get(0)).getOperatorMeta());
        Assert.assertEquals("" + physicalPlan.getOperators(meta4), 1L, r0.size());
        Assert.assertEquals("inputs" + ((PTOperator) pTContainer4.getOperators().get(0)).getInputs(), 1L, ((PTOperator) pTContainer4.getOperators().get(0)).getInputs().size());
        Assert.assertEquals("inputs" + ((PTOperator) pTContainer4.getOperators().get(0)).getInputs(), pTContainer3.getOperators().get(0), ((PTOperator.PTInput) ((PTOperator) pTContainer4.getOperators().get(0)).getInputs().get(0)).source.source);
        Assert.assertNotNull("partitioner called " + addOperator4, addOperator4.partitions);
        for (PTOperator pTOperator2 : physicalPlan.getOperators(meta)) {
            Assert.assertEquals("inputs " + pTOperator2, 1L, pTOperator2.getInputs().size());
            for (PTOperator.PTInput pTInput : pTOperator2.getInputs()) {
                Assert.assertNull("partition keys " + pTInput, pTInput.partitions);
            }
        }
    }

    @Test
    public void testParallelPartitioningValidation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new TestPlanContext());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.setInputPortAttribute(addOperator3.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setInputPortAttribute(addOperator3.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.addStream("o1Output1", addOperator.outport1, addOperator3.inport1);
        logicalPlan.addStream("o2Output1", addOperator2.outport1, addOperator3.inport2);
        try {
            new PhysicalPlan(logicalPlan, new TestPlanContext());
        } catch (AssertionError e) {
            Assert.assertThat("Parallel partition needs common ancestor", e.getMessage(), StramTestSupport.RegexMatcher.matches("operator cannot extend multiple partitions.*"));
        }
        GenericTestOperator addOperator4 = logicalPlan.addOperator("commonAncestor", GenericTestOperator.class);
        logicalPlan.setInputPortAttribute(addOperator.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setInputPortAttribute(addOperator2.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.addStream("commonAncestor.outport1", addOperator4.outport1, addOperator.inport1);
        logicalPlan.addStream("commonAncestor.outport2", addOperator4.outport2, addOperator2.inport1);
        new PhysicalPlan(logicalPlan, new TestPlanContext());
        LogicalPlan logicalPlan2 = new LogicalPlan();
        logicalPlan2.setAttribute(Context.OperatorContext.STORAGE_AGENT, new TestPlanContext());
        GenericTestOperator addOperator5 = logicalPlan2.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator6 = logicalPlan2.addOperator("o2", GenericTestOperator.class);
        logicalPlan2.setInputPortAttribute(addOperator6.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan2.setInputPortAttribute(addOperator6.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan2.addStream("o1.outport1", addOperator5.outport1, addOperator6.inport1);
        logicalPlan2.addStream("o2.outport2", addOperator5.outport2, addOperator6.inport2);
        new PhysicalPlan(logicalPlan2, new TestPlanContext());
    }

    @Test
    public void testMxNPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("o1", TestGeneratorInputOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        logicalPlan.addStream("o1.outport1", addOperator.outport, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 5L, physicalPlan.getContainers().size());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(i);
            Assert.assertEquals("number operators " + pTContainer, 1L, pTContainer.getOperators().size());
            Assert.assertEquals("operators " + pTContainer, meta.getName(), ((PTOperator) pTContainer.getOperators().get(0)).getOperatorMeta().getName());
            arrayList.add(pTContainer.getOperators().get(0));
        }
        int i2 = 2;
        while (i2 < 5) {
            PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(i2);
            Assert.assertEquals("number operators " + pTContainer2, 2L, pTContainer2.getOperators().size());
            Assert.assertEquals("operators " + pTContainer2, meta2.getName(), ((PTOperator) pTContainer2.getOperators().get(0)).getOperatorMeta().getName());
            HashSet newHashSet = Sets.newHashSet(new String[]{meta.getMeta(addOperator.outport).getUnifierMeta().getName(), meta2.getName()});
            HashMap hashMap = new HashMap();
            for (PTOperator pTOperator : pTContainer2.getOperators()) {
                hashMap.put(pTOperator.getOperatorMeta().getName(), pTOperator);
            }
            Assert.assertEquals("", newHashSet, hashMap.keySet());
            PTOperator pTOperator2 = (PTOperator) hashMap.get(meta.getMeta(addOperator.outport).getUnifierMeta().getName());
            Assert.assertNotNull("" + pTOperator2, pTOperator2.getContainer());
            Assert.assertTrue("" + pTOperator2, pTOperator2.isUnifier());
            Assert.assertEquals("" + pTOperator2, 2L, pTOperator2.getInputs().size());
            int i3 = i2 == 2 ? 2 : 1;
            for (int i4 = 0; i4 < pTOperator2.getInputs().size(); i4++) {
                PTOperator.PTInput pTInput = (PTOperator.PTInput) pTOperator2.getInputs().get(i4);
                Assert.assertEquals("" + pTOperator2, TestGeneratorInputOperator.OUTPUT_PORT, pTInput.source.portName);
                Assert.assertEquals("" + pTOperator2, arrayList.get(i4), pTInput.source.source);
                Assert.assertEquals("partition keys " + pTInput.partitions, i3, pTInput.partitions.partitions.size());
            }
            Assert.assertEquals("" + pTOperator2, 1L, pTOperator2.getOutputs().size());
            Assert.assertTrue("" + ((PTOperator) hashMap.get(meta2.getName())).getOperatorMeta().getOperator(), ((PTOperator) hashMap.get(meta2.getName())).getOperatorMeta().getOperator() instanceof GenericTestOperator);
            PTOperator pTOperator3 = (PTOperator) hashMap.get(meta2.getName());
            Assert.assertEquals("partition inputs " + pTOperator3.getInputs(), 1L, pTOperator3.getInputs().size());
            Assert.assertEquals("partition inputs " + pTOperator3.getInputs(), pTOperator2, ((PTOperator.PTInput) pTOperator3.getInputs().get(0)).source.source);
            Assert.assertEquals("input partition keys " + pTOperator3.getInputs(), (Object) null, ((PTOperator.PTInput) pTOperator3.getInputs().get(0)).partitions);
            Assert.assertTrue("partitioned unifier container local " + ((PTOperator.PTInput) pTOperator3.getInputs().get(0)).source, ((PTOperator.PTInput) pTOperator3.getInputs().get(0)).source.isDownStreamInline());
            i2++;
        }
        for (int i5 = 0; i5 < 2; i5++) {
            List<PTOperator> operators = physicalPlan.getOperators(meta2);
            HashSet newHashSet2 = Sets.newHashSet(operators);
            for (PTOperator pTOperator4 : operators) {
                newHashSet2.addAll(pTOperator4.upstreamMerge.values());
                PartitioningTest.PartitionLoadWatch.put(pTOperator4, -1);
                physicalPlan.onStatusUpdate(pTOperator4);
            }
            testPlanContext.backupRequests = 0;
            testPlanContext.events.remove(0).run();
            HashSet newHashSet3 = Sets.newHashSet(physicalPlan.getOperators(meta2));
            newHashSet3.addAll(physicalPlan.getMergeOperators(meta));
            Iterator it = physicalPlan.getOperators(meta2).iterator();
            while (it.hasNext()) {
                newHashSet3.addAll(((PTOperator) it.next()).upstreamMerge.values());
            }
            Assert.assertEquals("number of containers", 4L, physicalPlan.getContainers().size());
            Assert.assertEquals("number of operators", 2 - i5, physicalPlan.getOperators(meta2).size());
            Assert.assertEquals("undeployed operators " + testPlanContext.undeploy, newHashSet2, testPlanContext.undeploy);
            Assert.assertEquals("deployed operators " + testPlanContext.deploy, newHashSet3, testPlanContext.deploy);
        }
        for (int i6 = 0; i6 < 2; i6++) {
            LinkedList linkedList = new LinkedList(physicalPlan.getOperators(meta2));
            PTOperator pTOperator5 = (PTOperator) linkedList.remove(0);
            HashSet newHashSet4 = Sets.newHashSet(new PTOperator[]{pTOperator5});
            newHashSet4.addAll(physicalPlan.getMergeOperators(meta));
            newHashSet4.addAll(pTOperator5.upstreamMerge.values());
            LinkedList linkedList2 = new LinkedList();
            Iterator it2 = linkedList.iterator();
            while (it2.hasNext()) {
                linkedList2.addAll(((PTOperator) it2.next()).upstreamMerge.values());
            }
            linkedList.addAll(linkedList2);
            PartitioningTest.PartitionLoadWatch.put(pTOperator5, 1);
            physicalPlan.onStatusUpdate(pTOperator5);
            Assert.assertEquals("repartition event", 1L, testPlanContext.events.size());
            testPlanContext.backupRequests = 0;
            testPlanContext.events.remove(0).run();
            Assert.assertEquals("N partitions after scale up " + meta2, 2 + i6, physicalPlan.getOperators(meta2).size());
            Assert.assertTrue("no unifiers", physicalPlan.getMergeOperators(meta).isEmpty());
            for (PTOperator pTOperator6 : physicalPlan.getOperators(meta2)) {
                Assert.assertNotNull(pTOperator6.container);
                PTOperator pTOperator7 = (PTOperator) pTOperator6.upstreamMerge.values().iterator().next();
                Assert.assertNotNull(pTOperator7.container);
                Assert.assertSame("unifier in same container", pTOperator6.container, pTOperator7.container);
                Assert.assertEquals("container operators " + pTOperator6.container, Sets.newHashSet(pTOperator6.container.getOperators()), Sets.newHashSet(new PTOperator[]{pTOperator6, pTOperator7}));
            }
            HashSet newHashSet5 = Sets.newHashSet(physicalPlan.getOperators(meta2));
            Iterator it3 = physicalPlan.getOperators(meta2).iterator();
            while (it3.hasNext()) {
                newHashSet5.addAll(((PTOperator) it3.next()).upstreamMerge.values());
            }
            newHashSet5.removeAll(linkedList);
            Assert.assertEquals("number of containers", 4 + i6, physicalPlan.getContainers().size());
            Assert.assertEquals("undeployed operators" + testPlanContext.undeploy, newHashSet4, testPlanContext.undeploy);
            Assert.assertEquals("deployed operators" + testPlanContext.deploy, newHashSet5, testPlanContext.deploy);
        }
        HashSet newHashSet6 = Sets.newHashSet();
        HashSet newHashSet7 = Sets.newHashSet();
        for (PTOperator pTOperator8 : physicalPlan.getOperators(meta2)) {
            newHashSet6.addAll(pTOperator8.upstreamMerge.values());
            newHashSet6.add(pTOperator8);
            newHashSet7.add(pTOperator8);
        }
        for (PTOperator pTOperator9 : physicalPlan.getOperators(meta)) {
            newHashSet6.add(pTOperator9);
            PartitioningTest.PartitionLoadWatch.put(pTOperator9, -1);
            physicalPlan.onStatusUpdate(pTOperator9);
        }
        Assert.assertEquals("repartition event", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("M partitions after scale down " + meta, 1L, physicalPlan.getOperators(meta).size());
        newHashSet6.removeAll(physicalPlan.getOperators(meta));
        for (PTOperator pTOperator10 : physicalPlan.getOperators(meta2)) {
            Assert.assertTrue("merge unifier " + pTOperator10 + " " + pTOperator10.upstreamMerge, pTOperator10.upstreamMerge.isEmpty());
        }
        Assert.assertEquals("undeploy", newHashSet6, testPlanContext.undeploy);
        Assert.assertEquals("deploy", newHashSet7, testPlanContext.deploy);
        Assert.assertEquals("M partitions " + meta, 1L, physicalPlan.getOperators(meta).size());
        HashSet newHashSet8 = Sets.newHashSet();
        HashSet newHashSet9 = Sets.newHashSet();
        for (PTOperator pTOperator11 : physicalPlan.getOperators(meta)) {
            newHashSet8.add(pTOperator11);
            PartitioningTest.PartitionLoadWatch.put(pTOperator11, 1);
            physicalPlan.onStatusUpdate(pTOperator11);
        }
        Assert.assertEquals("repartition event", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("M partitions after scale up " + meta, 2L, physicalPlan.getOperators(meta).size());
        newHashSet9.addAll(physicalPlan.getOperators(meta));
        for (PTOperator pTOperator12 : physicalPlan.getOperators(meta2)) {
            newHashSet8.add(pTOperator12);
            newHashSet9.add(pTOperator12);
            Assert.assertEquals("merge unifier " + pTOperator12 + " " + pTOperator12.upstreamMerge, 1L, pTOperator12.upstreamMerge.size());
            newHashSet9.addAll(pTOperator12.upstreamMerge.values());
        }
        Assert.assertEquals("undeploy", newHashSet8, testPlanContext.undeploy);
        Assert.assertEquals("deploy", newHashSet9, testPlanContext.deploy);
    }

    @Test
    public void testSingleFinalMxNPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("o1", TestGeneratorInputOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
        logicalPlan.setOutputPortAttribute(addOperator.outport, Context.PortContext.UNIFIER_SINGLE_FINAL, true);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        logicalPlan.addStream("o1.outport1", addOperator.outport, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 6L, physicalPlan.getContainers().size());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(i);
            Assert.assertEquals("number operators " + pTContainer, 1L, pTContainer.getOperators().size());
            Assert.assertEquals("operators " + pTContainer, meta.getName(), ((PTOperator) pTContainer.getOperators().get(0)).getOperatorMeta().getName());
            arrayList.add(pTContainer.getOperators().get(0));
        }
        PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(2);
        Assert.assertEquals("number operators " + pTContainer2, 1L, pTContainer2.getOperators().size());
        PTOperator pTOperator = (PTOperator) pTContainer2.getOperators().get(0);
        Assert.assertEquals("operators " + pTContainer2, meta.getMeta(addOperator.outport).getUnifierMeta().getName(), pTOperator.getOperatorMeta().getName());
        Assert.assertTrue("single unifier " + pTOperator, pTOperator.isUnifier());
        Assert.assertEquals("" + pTOperator, 2L, pTOperator.getInputs().size());
        for (int i2 = 0; i2 < pTOperator.getInputs().size(); i2++) {
            PTOperator.PTInput pTInput = (PTOperator.PTInput) pTOperator.getInputs().get(i2);
            Assert.assertEquals("source port name " + pTOperator, TestGeneratorInputOperator.OUTPUT_PORT, pTInput.source.portName);
            Assert.assertEquals("" + pTOperator, arrayList.get(i2), pTInput.source.source);
            Assert.assertEquals("partition keys " + pTInput.partitions, (Object) null, pTInput.partitions);
        }
        Assert.assertEquals("number outputs " + pTOperator, 1L, pTOperator.getOutputs().size());
        PTOperator.PTOutput pTOutput = (PTOperator.PTOutput) pTOperator.getOutputs().get(0);
        Assert.assertEquals("number inputs " + pTOutput, 3L, pTOutput.sinks.size());
        for (int i3 = 0; i3 < pTOutput.sinks.size(); i3++) {
            Assert.assertEquals("output sink " + pTOutput, meta2.getName(), ((PTOperator.PTInput) pTOutput.sinks.get(i3)).target.getName());
            Assert.assertEquals("destination port name " + pTOutput, GenericTestOperator.IPORT1, ((PTOperator.PTInput) pTOutput.sinks.get(i3)).portName);
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i4 = 3; i4 < 6; i4++) {
            PTContainer pTContainer3 = (PTContainer) physicalPlan.getContainers().get(i4);
            Assert.assertEquals("number operators " + pTContainer3, 1L, pTContainer3.getOperators().size());
            Assert.assertEquals("operators " + pTContainer3, meta2.getName(), ((PTOperator) pTContainer3.getOperators().get(0)).getOperatorMeta().getName());
            PTOperator pTOperator2 = (PTOperator) pTContainer3.getOperators().get(0);
            Assert.assertEquals("operators " + pTContainer3, meta2.getName(), pTOperator2.getOperatorMeta().getName());
            Assert.assertEquals("number inputs " + pTOperator2, 1L, pTOperator2.getInputs().size());
            PTOperator.PTInput pTInput2 = (PTOperator.PTInput) pTOperator2.getInputs().get(0);
            Assert.assertEquals("" + pTOperator2, pTOperator, pTInput2.source.source);
            Assert.assertNotNull("input partitions " + pTOperator2, pTInput2.partitions);
            arrayList2.add(Integer.valueOf(pTInput2.partitions.partitions.size()));
        }
        Assert.assertEquals("input partition sizes count", 3L, arrayList2.size());
        Collections.sort(arrayList2);
        Assert.assertEquals("input partition sizes", Arrays.asList(1, 1, 2), arrayList2);
        for (int i5 = 0; i5 < 2; i5++) {
            List<PTOperator> operators = physicalPlan.getOperators(meta2);
            HashSet newHashSet = Sets.newHashSet(operators);
            for (PTOperator pTOperator3 : operators) {
                newHashSet.add(pTOperator3);
                PartitioningTest.PartitionLoadWatch.put(pTOperator3, -1);
                physicalPlan.onStatusUpdate(pTOperator3);
            }
            testPlanContext.backupRequests = 0;
            testPlanContext.events.remove(0).run();
            Assert.assertEquals("single unifier ", 1L, physicalPlan.getMergeOperators(meta).size());
            HashSet newHashSet2 = Sets.newHashSet(physicalPlan.getOperators(meta2));
            Iterator it = physicalPlan.getOperators(meta2).iterator();
            while (it.hasNext()) {
                newHashSet2.add((PTOperator) it.next());
            }
            Assert.assertEquals("number of containers", 5 - i5, physicalPlan.getContainers().size());
            Assert.assertEquals("number of operators", 2 - i5, physicalPlan.getOperators(meta2).size());
            Assert.assertEquals("undeployed operators " + testPlanContext.undeploy, newHashSet, testPlanContext.undeploy);
            Assert.assertEquals("deployed operators " + testPlanContext.deploy, newHashSet2, testPlanContext.deploy);
        }
        for (int i6 = 0; i6 < 2; i6++) {
            LinkedList linkedList = new LinkedList(physicalPlan.getOperators(meta2));
            PTOperator pTOperator4 = (PTOperator) linkedList.remove(0);
            HashSet newHashSet3 = Sets.newHashSet(new PTOperator[]{pTOperator4});
            PartitioningTest.PartitionLoadWatch.put(pTOperator4, 1);
            physicalPlan.onStatusUpdate(pTOperator4);
            Assert.assertEquals("repartition event", 1L, testPlanContext.events.size());
            testPlanContext.backupRequests = 0;
            testPlanContext.events.remove(0).run();
            Assert.assertEquals("single unifier ", 1L, physicalPlan.getMergeOperators(meta).size());
            Assert.assertEquals("N partitions after scale up " + meta2, 2 + i6, physicalPlan.getOperators(meta2).size());
            Iterator it2 = physicalPlan.getOperators(meta2).iterator();
            while (it2.hasNext()) {
                Assert.assertNotNull(((PTOperator) it2.next()).container);
                Assert.assertEquals("number operators ", 1L, r0.container.getOperators().size());
            }
            HashSet newHashSet4 = Sets.newHashSet(physicalPlan.getOperators(meta2));
            newHashSet4.removeAll(linkedList);
            Assert.assertEquals("number of containers", 5 + i6, physicalPlan.getContainers().size());
            Assert.assertEquals("undeployed operators" + testPlanContext.undeploy, newHashSet3, testPlanContext.undeploy);
            Assert.assertEquals("deployed operators" + testPlanContext.deploy, newHashSet4, testPlanContext.deploy);
        }
        HashSet newHashSet5 = Sets.newHashSet();
        HashSet newHashSet6 = Sets.newHashSet();
        newHashSet5.addAll(physicalPlan.getMergeOperators(meta));
        for (PTOperator pTOperator5 : physicalPlan.getOperators(meta2)) {
            newHashSet5.add(pTOperator5);
            newHashSet6.add(pTOperator5);
        }
        for (PTOperator pTOperator6 : physicalPlan.getOperators(meta)) {
            newHashSet5.add(pTOperator6);
            PartitioningTest.PartitionLoadWatch.put(pTOperator6, -1);
            physicalPlan.onStatusUpdate(pTOperator6);
        }
        Assert.assertEquals("repartition event", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("M partitions after scale down " + meta, 1L, physicalPlan.getOperators(meta).size());
        newHashSet5.removeAll(physicalPlan.getOperators(meta));
        Assert.assertEquals("undeploy", newHashSet5, testPlanContext.undeploy);
        Assert.assertEquals("deploy", newHashSet6, testPlanContext.deploy);
        Assert.assertEquals("M partitions " + meta, 1L, physicalPlan.getOperators(meta).size());
        HashSet newHashSet7 = Sets.newHashSet();
        HashSet newHashSet8 = Sets.newHashSet();
        for (PTOperator pTOperator7 : physicalPlan.getOperators(meta)) {
            newHashSet7.add(pTOperator7);
            PartitioningTest.PartitionLoadWatch.put(pTOperator7, 1);
            physicalPlan.onStatusUpdate(pTOperator7);
        }
        Assert.assertEquals("repartition event", 1L, testPlanContext.events.size());
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("M partitions after scale up " + meta, 2L, physicalPlan.getOperators(meta).size());
        newHashSet8.addAll(physicalPlan.getOperators(meta));
        newHashSet8.addAll(physicalPlan.getMergeOperators(meta));
        for (PTOperator pTOperator8 : physicalPlan.getOperators(meta2)) {
            newHashSet7.add(pTOperator8);
            newHashSet8.add(pTOperator8);
            Assert.assertNotNull(pTOperator8.container);
            Assert.assertEquals("number operators ", 1L, pTOperator8.container.getOperators().size());
        }
        Assert.assertEquals("undeploy", newHashSet7, testPlanContext.undeploy);
        Assert.assertEquals("deploy", newHashSet8, testPlanContext.deploy);
    }

    @Test
    public void testAugmentedDynamicPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("o1", TestGeneratorInputOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new TestAugmentingPartitioner(3));
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        logicalPlan.addStream("o1.outport1", addOperator.outport, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 5L, physicalPlan.getContainers().size());
        List operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("number of o1 operators", 3L, operators.size());
        Assert.assertEquals("number of o2 operators", 1L, physicalPlan.getOperators(meta2).size());
        List mergeOperators = physicalPlan.getMergeOperators(meta);
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        newLinkedHashSet.addAll(physicalPlan.getOperators(meta2));
        newLinkedHashSet.addAll(mergeOperators);
        for (int i = 0; i < 2; i++) {
            PartitioningTest.PartitionLoadWatch.put((PTOperator) operators.get(i), 1);
            physicalPlan.onStatusUpdate((PTOperator) operators.get(i));
        }
        testPlanContext.backupRequests = 0;
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("number of containers", 7L, physicalPlan.getContainers().size());
        Assert.assertEquals("undeployed opertors", newLinkedHashSet, testPlanContext.undeploy);
    }

    @Test
    public void testCascadingUnifier() {
        LogicalPlan logicalPlan = new LogicalPlan();
        PartitioningTestOperator addOperator = logicalPlan.addOperator("o1", PartitioningTestOperator.class);
        addOperator.partitionKeys = new Integer[]{0, 1, 2, 3};
        addOperator.setPartitionCount(addOperator.partitionKeys.length);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_LIMIT, 2);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 9L, physicalPlan.getContainers().size());
        List<PTOperator> operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("partitions " + meta, 4L, operators.size());
        Assert.assertEquals("partitioned map " + addOperator.partitions, 4L, addOperator.partitions.size());
        List operators2 = physicalPlan.getOperators(meta2);
        Assert.assertEquals("partitions " + meta, 3L, operators2.size());
        for (PTOperator pTOperator : operators) {
            Assert.assertEquals("outputs " + pTOperator, 1L, pTOperator.getOutputs().size());
            Iterator it = pTOperator.getOutputs().iterator();
            while (it.hasNext()) {
                Assert.assertEquals("sinks " + ((PTOperator.PTOutput) it.next()), 1L, r0.sinks.size());
            }
            Assert.assertNotNull("container " + pTOperator, pTOperator.getContainer());
        }
        List<PTOperator> mergeOperators = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("o1Unifiers " + meta, 2L, mergeOperators.size());
        for (PTOperator pTOperator2 : mergeOperators) {
            Assert.assertEquals("inputs " + pTOperator2, 2L, pTOperator2.getInputs().size());
            Assert.assertEquals("outputs " + pTOperator2, 1L, pTOperator2.getOutputs().size());
            for (PTOperator.PTOutput pTOutput : pTOperator2.getOutputs()) {
                Assert.assertEquals("sinks " + pTOutput, 3L, pTOutput.sinks.size());
                Iterator it2 = pTOutput.sinks.iterator();
                while (it2.hasNext()) {
                    Assert.assertTrue(((PTOperator.PTInput) it2.next()).target.isUnifier());
                    Assert.assertEquals(1L, ((PTOperator.PTOutput) r0.target.getOutputs().get(0)).sinks.size());
                }
            }
            Assert.assertNotNull("container " + pTOperator2, pTOperator2.getContainer());
        }
        for (int i = 0; i < 4; i++) {
            PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(i);
            Assert.assertEquals("number operators " + pTContainer, 1L, pTContainer.getOperators().size());
            Assert.assertTrue(operators.contains(pTContainer.getOperators().get(0)));
        }
        for (int i2 = 4; i2 < 6; i2++) {
            PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(i2);
            Assert.assertEquals("number operators " + pTContainer2, 1L, pTContainer2.getOperators().size());
            Assert.assertTrue(mergeOperators.contains(pTContainer2.getOperators().get(0)));
        }
        for (int i3 = 6; i3 < 9; i3++) {
            PTContainer pTContainer3 = (PTContainer) physicalPlan.getContainers().get(i3);
            Assert.assertEquals("number operators " + pTContainer3, 2L, pTContainer3.getOperators().size());
            Assert.assertTrue(operators2.contains(pTContainer3.getOperators().get(0)));
        }
        PTOperator pTOperator3 = (PTOperator) operators.get(0);
        Assert.assertTrue("stats handlers " + pTOperator3.statsListeners, ((StatsListener) pTOperator3.statsListeners.get(0)) instanceof PartitioningTest.PartitionLoadWatch);
        PartitioningTest.PartitionLoadWatch.put(pTOperator3, 1);
        physicalPlan.onStatusUpdate(pTOperator3);
        Assert.assertEquals("partition scaling triggered", 1L, testPlanContext.events.size());
        addOperator.partitionKeys = new Integer[]{0, 1, 2, 3, 4};
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("partitions " + meta, 5L, physicalPlan.getOperators(meta).size());
        Assert.assertEquals("partitioned map " + addOperator.partitions, 5L, addOperator.partitions.size());
        List<PTOperator> mergeOperators2 = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("o1Unifiers " + meta, 3L, mergeOperators2.size());
        for (PTOperator pTOperator4 : mergeOperators2) {
            Assert.assertNotNull("container null: " + pTOperator4, pTOperator4.getContainer());
        }
    }

    @Test
    public void testSingleFinalCascadingUnifier() {
        LogicalPlan logicalPlan = new LogicalPlan();
        PartitioningTestOperator addOperator = logicalPlan.addOperator("o1", PartitioningTestOperator.class);
        addOperator.partitionKeys = new Integer[]{0, 1, 2, 3};
        addOperator.setPartitionCount(3);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_LIMIT, 2);
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_SINGLE_FINAL, true);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 12);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 10L, physicalPlan.getContainers().size());
        List<PTOperator> operators = physicalPlan.getOperators(meta);
        Assert.assertEquals("partitions " + meta, 4L, operators.size());
        Assert.assertEquals("partitioned map " + addOperator.partitions, 4L, addOperator.partitions.size());
        List operators2 = physicalPlan.getOperators(meta2);
        Assert.assertEquals("partitions " + meta, 3L, operators2.size());
        for (PTOperator pTOperator : operators) {
            Assert.assertEquals("outputs " + pTOperator, 1L, pTOperator.getOutputs().size());
            Iterator it = pTOperator.getOutputs().iterator();
            while (it.hasNext()) {
                Assert.assertEquals("sinks " + ((PTOperator.PTOutput) it.next()), 1L, r0.sinks.size());
            }
            Assert.assertNotNull("container " + pTOperator, pTOperator.getContainer());
        }
        List<PTOperator> mergeOperators = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("o1Unifiers " + meta, 3L, mergeOperators.size());
        ArrayList arrayList = new ArrayList();
        for (PTOperator pTOperator2 : mergeOperators) {
            Assert.assertEquals("inputs " + pTOperator2, 2L, pTOperator2.getInputs().size());
            Assert.assertEquals("outputs " + pTOperator2, 1L, pTOperator2.getOutputs().size());
            List list = ((PTOperator.PTOutput) pTOperator2.getOutputs().get(0)).sinks;
            if (list.size() > 0 ? ((PTOperator.PTInput) list.get(0)).target.getOperatorMeta() == meta2 : false) {
                for (PTOperator.PTOutput pTOutput : pTOperator2.getOutputs()) {
                    Assert.assertEquals("sinks " + pTOutput, 3L, pTOutput.sinks.size());
                    Iterator it2 = pTOutput.sinks.iterator();
                    while (it2.hasNext()) {
                        Assert.assertFalse(((PTOperator.PTInput) it2.next()).target.isUnifier());
                    }
                }
                arrayList.add(pTOperator2);
            } else {
                for (PTOperator.PTOutput pTOutput2 : pTOperator2.getOutputs()) {
                    Assert.assertEquals("sinks " + pTOutput2, 1L, pTOutput2.sinks.size());
                    Assert.assertTrue(((PTOperator.PTInput) pTOutput2.sinks.get(0)).target.isUnifier());
                }
            }
            Assert.assertNotNull("container " + pTOperator2, pTOperator2.getContainer());
        }
        Assert.assertEquals("o1 final unifiers", 1L, arrayList.size());
        for (int i = 0; i < 4; i++) {
            PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(i);
            Assert.assertEquals("number operators " + pTContainer, 1L, pTContainer.getOperators().size());
            Assert.assertTrue(operators.contains(pTContainer.getOperators().get(0)));
        }
        for (int i2 = 4; i2 < 7; i2++) {
            PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(i2);
            Assert.assertEquals("number operators " + pTContainer2, 1L, pTContainer2.getOperators().size());
            Assert.assertTrue(mergeOperators.contains(pTContainer2.getOperators().get(0)));
        }
        for (int i3 = 7; i3 < 10; i3++) {
            PTContainer pTContainer3 = (PTContainer) physicalPlan.getContainers().get(i3);
            Assert.assertEquals("number operators " + pTContainer3, 1L, pTContainer3.getOperators().size());
            Assert.assertTrue(operators2.contains(pTContainer3.getOperators().get(0)));
        }
        PTOperator pTOperator3 = (PTOperator) operators.get(0);
        Assert.assertTrue("stats handlers " + pTOperator3.statsListeners, ((StatsListener) pTOperator3.statsListeners.get(0)) instanceof PartitioningTest.PartitionLoadWatch);
        PartitioningTest.PartitionLoadWatch.put(pTOperator3, 1);
        physicalPlan.onStatusUpdate(pTOperator3);
        Assert.assertEquals("partition scaling triggered", 1L, testPlanContext.events.size());
        addOperator.partitionKeys = new Integer[]{0, 1, 2, 3, 4};
        testPlanContext.events.remove(0).run();
        Assert.assertEquals("partitions " + meta, 5L, physicalPlan.getOperators(meta).size());
        Assert.assertEquals("partitioned map " + addOperator.partitions, 5L, addOperator.partitions.size());
        List<PTOperator> mergeOperators2 = physicalPlan.getMergeOperators(meta);
        Assert.assertEquals("o1Unifiers " + meta, 4L, mergeOperators2.size());
        for (PTOperator pTOperator4 : mergeOperators2) {
            Assert.assertNotNull("container null: " + pTOperator4, pTOperator4.getContainer());
        }
    }

    @Test
    public void testSingleFinalUnifierInputOverride() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setInputPortAttribute(addOperator2.inport1, Context.PortContext.UNIFIER_SINGLE_FINAL, true);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 6L, physicalPlan.getContainers().size());
        Assert.assertEquals("o1 merge unifiers", 1L, physicalPlan.getMergeOperators(meta).size());
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_SINGLE_FINAL, false);
        TestPlanContext testPlanContext2 = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext2);
        PhysicalPlan physicalPlan2 = new PhysicalPlan(logicalPlan, testPlanContext2);
        Assert.assertEquals("number of containers", 6L, physicalPlan2.getContainers().size());
        Assert.assertEquals("o1 merge unifiers", 1L, physicalPlan2.getMergeOperators(meta).size());
        logicalPlan.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_SINGLE_FINAL, true);
        logicalPlan.setInputPortAttribute(addOperator2.inport1, Context.PortContext.UNIFIER_SINGLE_FINAL, false);
        TestPlanContext testPlanContext3 = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext3);
        PhysicalPlan physicalPlan3 = new PhysicalPlan(logicalPlan, testPlanContext3);
        Assert.assertEquals("number of containers", 5L, physicalPlan3.getContainers().size());
        HashSet newHashSet = Sets.newHashSet(new String[]{meta.getMeta(addOperator.outport1).getUnifierMeta().getName(), meta2.getName()});
        for (int i = 3; i < 5; i++) {
            PTContainer pTContainer = (PTContainer) physicalPlan3.getContainers().get(i);
            Assert.assertEquals("o2 container size", 2L, pTContainer.getOperators().size());
            HashSet newHashSet2 = Sets.newHashSet();
            Iterator it = pTContainer.getOperators().iterator();
            while (it.hasNext()) {
                newHashSet2.add(((PTOperator) it.next()).getOperatorMeta().getName());
            }
            Assert.assertEquals("o2 container operators", newHashSet, newHashSet2);
        }
    }

    @Test
    public void testSingleFinalUnifierMultiInput() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(4));
        logicalPlan.setInputPortAttribute(addOperator2.inport1, Context.PortContext.UNIFIER_SINGLE_FINAL, true);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        LogicalPlan.OperatorMeta meta3 = logicalPlan.getMeta(addOperator3);
        logicalPlan.addStream("o1o2o3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 12);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        Assert.assertEquals("number of containers", 10L, physicalPlan.getContainers().size());
        Assert.assertEquals("o1 merge unifiers", 1L, physicalPlan.getMergeOperators(meta).size());
        PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(3);
        Assert.assertEquals("number of operators " + pTContainer, 1L, pTContainer.getOperators().size());
        PTOperator pTOperator = (PTOperator) pTContainer.getOperators().get(0);
        Assert.assertTrue("unifier check " + pTOperator, pTOperator.isUnifier());
        Assert.assertEquals("operator meta " + pTOperator, meta.getMeta(addOperator.outport1).getUnifierMeta(), pTOperator.getOperatorMeta());
        int i = 0;
        int i2 = 0;
        HashSet newHashSet = Sets.newHashSet(new String[]{meta.getMeta(addOperator.outport1).getUnifierMeta().getName(), meta3.getName()});
        for (int i3 = 4; i3 < 10; i3++) {
            PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(i3);
            List operators = pTContainer2.getOperators();
            Assert.assertTrue("expected operator count " + pTContainer2, operators.size() <= 2 && operators.size() > 0);
            if (operators.size() == 1) {
                Assert.assertEquals("operator in container " + pTContainer2, meta2, ((PTOperator) operators.get(0)).getOperatorMeta());
                i++;
            } else if (operators.size() == 2) {
                HashSet newHashSet2 = Sets.newHashSet();
                Iterator it = pTContainer2.getOperators().iterator();
                while (it.hasNext()) {
                    newHashSet2.add(((PTOperator) it.next()).getOperatorMeta().getName());
                }
                Assert.assertEquals("container operators " + pTContainer2, newHashSet, newHashSet2);
                i2++;
            }
        }
        Assert.assertEquals("number o2 containers", 4L, i);
        Assert.assertEquals("number o3 containers", 2L, i2);
    }

    @Test
    public void testContainerSize() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.VCORES, 1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.VCORES, 2);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.MEMORY_MB, 4000);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 2L, physicalPlan.getContainers().size());
        Assert.assertEquals("memory container 1", 2560L, ((PTContainer) physicalPlan.getContainers().get(0)).getRequiredMemoryMB());
        Assert.assertEquals("vcores container 1", 1L, ((PTContainer) physicalPlan.getContainers().get(0)).getRequiredVCores());
        Assert.assertEquals("memory container 2", 4512L, ((PTContainer) physicalPlan.getContainers().get(1)).getRequiredMemoryMB());
        Assert.assertEquals("vcores container 2", 2L, ((PTContainer) physicalPlan.getContainers().get(1)).getRequiredVCores());
        Assert.assertEquals("number of operators in container 1", 2L, ((PTContainer) physicalPlan.getContainers().get(0)).getOperators().size());
    }

    @Test
    public void testContainerCores() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("o4", GenericTestOperator.class);
        GenericTestOperator addOperator5 = logicalPlan.addOperator("o5", GenericTestOperator.class);
        GenericTestOperator addOperator6 = logicalPlan.addOperator("o6", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.VCORES, 1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.VCORES, 2);
        logicalPlan.setAttribute(addOperator3, Context.OperatorContext.VCORES, 3);
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.VCORES, 4);
        logicalPlan.setAttribute(addOperator5, Context.OperatorContext.VCORES, 5);
        logicalPlan.setAttribute(addOperator6, Context.OperatorContext.VCORES, 6);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1, addOperator4.inport1).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("o3.output1", addOperator3.outport1, addOperator5.inport1).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("o4.output1", addOperator4.outport1, addOperator5.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("o5.output1", addOperator5.outport1, addOperator6.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 1L, physicalPlan.getContainers().size());
        Assert.assertEquals("vcores container 1 is 12", 12L, ((PTContainer) physicalPlan.getContainers().get(0)).getRequiredVCores());
    }

    @Test
    public void testContainerSizeWithPartitioning() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 5L, physicalPlan.getContainers().size());
        for (int i = 0; i < 5; i++) {
            PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(i);
            if (pTContainer.getOperators().size() == 1) {
                Assert.assertEquals("container memory is 1536 for container :" + pTContainer, 1536L, pTContainer.getRequiredMemoryMB());
            }
            if (pTContainer.getOperators().size() == 2) {
                Assert.assertEquals("container memory is 2048 for container :" + pTContainer, 2048L, pTContainer.getRequiredMemoryMB());
            }
        }
    }

    @Test
    public void testDefaultPartitionerWithParallel() throws InterruptedException {
        final MutableInt mutableInt = new MutableInt();
        StatsListener statsListener = new StatsListener() { // from class: com.datatorrent.stram.plan.physical.PhysicalPlanTest.1
            public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
                StatsListener.Response response = new StatsListener.Response();
                response.repartitionRequired = true;
                response.loadIndicator = mutableInt.intValue();
                return response;
            }
        };
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("X", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{statsListener}));
        GenericTestOperator addOperator2 = logicalPlan.addOperator("Y", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new TestPartitioner());
        GenericTestOperator addOperator3 = logicalPlan.addOperator("Z", GenericTestOperator.class);
        logicalPlan.addStream("Stream1", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        logicalPlan.addStream("Stream2", addOperator.outport2, addOperator2.inport2, addOperator3.inport2);
        logicalPlan.setInputPortAttribute(addOperator2.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setInputPortAttribute(addOperator2.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setInputPortAttribute(addOperator3.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setInputPortAttribute(addOperator3.inport2, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        TestPlanContext testPlanContext = new TestPlanContext();
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(addOperator2);
        Assert.assertEquals("number operators " + meta.getName(), 2L, physicalPlan.getOperators(meta).size());
        Assert.assertEquals("number operators " + meta2.getName(), 2L, physicalPlan.getOperators(meta2).size());
        List<PTOperator> operators = physicalPlan.getOperators(meta);
        for (PTOperator pTOperator : operators) {
            Assert.assertEquals("2 streams " + pTOperator.getOutputs(), 2L, pTOperator.getOutputs().size());
            for (PTOperator.PTOutput pTOutput : pTOperator.getOutputs()) {
                HashSet newHashSet = Sets.newHashSet();
                Assert.assertEquals("sink of " + meta.getName() + " id " + pTOperator.id + " port " + pTOutput.portName, 2L, pTOutput.sinks.size());
                Iterator it = pTOutput.sinks.iterator();
                while (it.hasNext()) {
                    newHashSet.add(((PTOperator.PTInput) it.next()).target);
                }
                Assert.assertEquals(2L, newHashSet.size());
            }
        }
        mutableInt.setValue(0);
        Iterator it2 = operators.iterator();
        while (it2.hasNext()) {
            physicalPlan.onStatusUpdate((PTOperator) it2.next());
        }
        testPlanContext.events.remove(0).run();
        for (PTOperator pTOperator2 : operators) {
            Assert.assertEquals("2 streams " + pTOperator2.getOutputs(), 2L, pTOperator2.getOutputs().size());
            for (PTOperator.PTOutput pTOutput2 : pTOperator2.getOutputs()) {
                HashSet newHashSet2 = Sets.newHashSet();
                Assert.assertEquals("sink of " + meta.getName() + " id " + pTOperator2.id + " port " + pTOutput2.portName, 2L, pTOutput2.sinks.size());
                Iterator it3 = pTOutput2.sinks.iterator();
                while (it3.hasNext()) {
                    newHashSet2.add(((PTOperator.PTInput) it3.next()).target);
                }
                Assert.assertEquals(2L, newHashSet2.size());
            }
        }
        mutableInt.setValue(1);
        physicalPlan.onStatusUpdate((PTOperator) operators.get(0));
        testPlanContext.events.get(0).run();
        List<PTOperator> operators2 = physicalPlan.getOperators(meta);
        Assert.assertEquals("3 partitons " + operators2, 3L, operators2.size());
        for (PTOperator pTOperator3 : operators2) {
            Assert.assertEquals("2 streams " + pTOperator3.getOutputs(), 2L, pTOperator3.getOutputs().size());
            for (PTOperator.PTOutput pTOutput3 : pTOperator3.getOutputs()) {
                HashSet newHashSet3 = Sets.newHashSet();
                Assert.assertEquals("sink of " + meta.getName() + " id " + pTOperator3.id + " port " + pTOutput3.portName, 2L, pTOutput3.sinks.size());
                Iterator it4 = pTOutput3.sinks.iterator();
                while (it4.hasNext()) {
                    newHashSet3.add(((PTOperator.PTInput) it4.next()).target);
                }
                Assert.assertEquals(2L, newHashSet3.size());
            }
        }
    }

    @Test
    public void testContainersForSlidingWindow() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
        logicalPlan.getOperatorMeta("o1").getMeta(addOperator.outport1).getUnifierMeta().getAttributes().put(Context.OperatorContext.MEMORY_MB, 2000);
        logicalPlan.getOperatorMeta("o1").getMeta(addOperator.outport2).getUnifierMeta().getAttributes().put(Context.OperatorContext.MEMORY_MB, 4000);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("o1.outport2", addOperator.outport2, addOperator2.inport2);
        logicalPlan.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertEquals("number of containers", 5L, physicalPlan.getContainers().size());
        boolean z = false;
        boolean z2 = false;
        for (PTContainer pTContainer : physicalPlan.getContainers()) {
            Assert.assertEquals("number of operators in each container is 1", pTContainer.operators.size(), 1L);
            if (((PTOperator) pTContainer.operators.get(0)).isUnifier()) {
                String name = ((PTOperator) pTContainer.operators.get(0)).getName();
                if (name.equals("o1.outport1#slider")) {
                    z = true;
                    Assert.assertEquals("container memory is 2512", pTContainer.getRequiredMemoryMB(), 2512L);
                } else if (name.equals("o1.outport2#slider")) {
                    z2 = true;
                    Assert.assertEquals("container memory is 2512", pTContainer.getRequiredMemoryMB(), 4512L);
                }
            }
        }
        Assert.assertEquals("Found output1 slider", true, Boolean.valueOf(z));
        Assert.assertEquals("Found output2 slider", true, Boolean.valueOf(z2));
    }

    @Test
    public void testMxNPartitionForSlidingWindow() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.getOperatorMeta("o1").getMeta(addOperator.outport1).getUnifierMeta().getAttributes().put(Context.OperatorContext.MEMORY_MB, 1024);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1);
        Assert.assertEquals("number of containers", 10L, new PhysicalPlan(logicalPlan, new TestPlanContext()).getContainers().size());
    }

    @Test
    public void testParallelPartitionForSlidingWindow() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setInputPortAttribute(addOperator2.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1);
        Assert.assertEquals("number of containers", 8L, new PhysicalPlan(logicalPlan, new TestPlanContext()).getContainers().size());
    }
}
