package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/StreamCodecTest.class */
public class StreamCodecTest {
    private LogicalPlan dag;

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    /* loaded from: input_file:com/datatorrent/stram/StreamCodecTest$DefaultCodecOperator.class */
    public static class DefaultCodecOperator extends GenericTestOperator {
        private static final DefaultTestStreamCodec codec = new DefaultTestStreamCodec();

        @InputPortFieldAnnotation(optional = true)
        public final transient Operator.InputPort<Object> inportWithCodec = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.StreamCodecTest.DefaultCodecOperator.1
            public StreamCodec<Object> getStreamCodec() {
                return DefaultCodecOperator.codec;
            }

            public final void process(Object obj) {
            }
        };
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamCodecTest$DefaultTestStreamCodec.class */
    public static class DefaultTestStreamCodec extends DefaultStatefulStreamCodec<Object> implements Serializable {
        private static final long serialVersionUID = 1;
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamCodecTest$TestStreamCodec.class */
    public static class TestStreamCodec extends DefaultStatefulStreamCodec<Object> {
        public int getPartition(Object obj) {
            return obj.hashCode() / 2;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamCodecTest$TestStreamCodec2.class */
    public static class TestStreamCodec2 extends DefaultStatefulStreamCodec<Object> {
        public int getPartition(Object obj) {
            return obj.hashCode() / 3;
        }
    }

    @Before
    public void setup() {
        this.dag = StramTestSupport.createDAG(this.testMeta);
    }

    @Test
    public void testStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("n2n3", addOperator2.outport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 3L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getSingleOperatorDeployInfo(addOperator, streamingContainerManager), meta.getMeta(addOperator.outport1));
        Assert.assertEquals("number stream codecs " + (meta.getName() + " " + outputDeployInfo.portName), outputDeployInfo.streamCodecs.size(), 1L);
        Assert.assertTrue("No user set stream codec", outputDeployInfo.streamCodecs.containsValue(null));
        OperatorDeployInfo singleOperatorDeployInfo = getSingleOperatorDeployInfo(addOperator2, streamingContainerManager);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(singleOperatorDeployInfo, meta2.getMeta(addOperator2.inport1));
        Assert.assertEquals("number stream codecs " + (meta2.getName() + " " + inputDeployInfo.portName), inputDeployInfo.streamCodecs.size(), 1L);
        Assert.assertTrue("No user set stream codec", inputDeployInfo.streamCodecs.containsValue(null));
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo2 = getOutputDeployInfo(singleOperatorDeployInfo, meta2.getMeta(addOperator2.outport1));
        String str = meta2.getName() + " " + outputDeployInfo2.portName;
        Assert.assertEquals("number stream codecs " + str, outputDeployInfo2.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta3, addOperator3.inport1, outputDeployInfo2.streamCodecs, str, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getSingleOperatorDeployInfo(addOperator3, streamingContainerManager), meta3.getMeta(addOperator3.inport1));
        String str2 = meta3.getName() + " " + inputDeployInfo2.portName;
        Assert.assertEquals("number stream codecs " + str2, inputDeployInfo2.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo2.streamCodecs, str2, physicalPlan);
    }

    @Test
    public void testStreamCodecReuse() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        GenericTestOperator addOperator4 = this.dag.addOperator("node4", GenericTestOperator.class);
        TestStreamCodec testStreamCodec = new TestStreamCodec();
        this.dag.setInputPortAttribute(addOperator4.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        GenericTestOperator addOperator5 = this.dag.addOperator("node5", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator5.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        GenericTestOperator addOperator6 = this.dag.addOperator("node6", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator6.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("n2n3", addOperator2.outport1, addOperator3.inport1);
        this.dag.addStream("n3n4", addOperator3.outport1, addOperator4.inport1);
        this.dag.addStream("n4n5", addOperator4.outport1, addOperator5.inport1);
        this.dag.addStream("n5n6", addOperator5.outport1, addOperator6.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        List containers = streamingContainerManager.getPhysicalPlan().getContainers();
        Assert.assertEquals("number containers", 6L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        getSingleOperatorDeployInfo(addOperator, streamingContainerManager);
        getSingleOperatorDeployInfo(addOperator2, streamingContainerManager);
        getSingleOperatorDeployInfo(addOperator3, streamingContainerManager);
        getSingleOperatorDeployInfo(addOperator4, streamingContainerManager);
        getSingleOperatorDeployInfo(addOperator5, streamingContainerManager);
        getSingleOperatorDeployInfo(addOperator6, streamingContainerManager);
        Assert.assertEquals("number of stream codec identifiers", 3L, r0.getStreamCodecIdentifiers().size());
    }

    @Test
    public void testDefaultStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        DefaultCodecOperator addOperator2 = this.dag.addOperator("node2", DefaultCodecOperator.class);
        DefaultCodecOperator addOperator3 = this.dag.addOperator("node3", DefaultCodecOperator.class);
        this.dag.setInputPortAttribute(addOperator3.inportWithCodec, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inportWithCodec);
        this.dag.addStream("n2n3", addOperator2.outport1, addOperator3.inportWithCodec);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 3L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getSingleOperatorDeployInfo(addOperator, streamingContainerManager), meta.getMeta(addOperator.outport1));
        String str = meta.getName() + " " + outputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str, outputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta2, addOperator2.inportWithCodec, outputDeployInfo.streamCodecs, str, physicalPlan);
        OperatorDeployInfo singleOperatorDeployInfo = getSingleOperatorDeployInfo(addOperator2, streamingContainerManager);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(singleOperatorDeployInfo, meta2.getMeta(addOperator2.inportWithCodec));
        String str2 = meta2.getName() + " " + inputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str2, inputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta2, addOperator2.inportWithCodec, inputDeployInfo.streamCodecs, str2, physicalPlan);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo2 = getOutputDeployInfo(singleOperatorDeployInfo, meta2.getMeta(addOperator2.outport1));
        String str3 = meta2.getName() + " " + outputDeployInfo2.portName;
        Assert.assertEquals("number stream codecs " + str3, outputDeployInfo2.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta3, addOperator3.inportWithCodec, outputDeployInfo2.streamCodecs, str3, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getSingleOperatorDeployInfo(addOperator3, streamingContainerManager), meta3.getMeta(addOperator3.inportWithCodec));
        String str4 = meta3.getName() + " " + inputDeployInfo2.portName;
        Assert.assertEquals("number stream codecs " + str4, inputDeployInfo2.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta3, addOperator3.inportWithCodec, inputDeployInfo2.streamCodecs, str4, physicalPlan);
    }

    @Test
    public void testPartitioningStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 4L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getSingleOperatorDeployInfo(addOperator, streamingContainerManager), meta.getMeta(addOperator.outport1));
        String str = meta.getName() + " " + outputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str, outputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str, physicalPlan);
        List operators = physicalPlan.getOperators(meta2);
        Assert.assertEquals("number operators " + meta2.getName(), 3L, operators.size());
        Iterator it = operators.iterator();
        while (it.hasNext()) {
            OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(getOperatorDeployInfo((PTOperator) it.next(), meta2.getName(), streamingContainerManager), meta2.getMeta(addOperator2.inport1));
            String str2 = meta2.getName() + " " + inputDeployInfo.portName;
            Assert.assertEquals("number stream codecs " + str2, inputDeployInfo.streamCodecs.size(), 1L);
            checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo.streamCodecs, str2, physicalPlan);
        }
    }

    @Test
    public void testMxNPartitioningStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        Assert.assertEquals("number operators " + meta.getName(), 2L, physicalPlan.getOperators(meta).size());
        Assert.assertEquals("number operators " + meta2.getName(), 3L, physicalPlan.getOperators(meta2).size());
        Iterator it = containers.iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : getOperatorDeployInfo(pTOperator, pTOperator.getName(), streamingContainerManager).inputs) {
                        String str = pTOperator.getName() + " " + inputDeployInfo.portName;
                        Assert.assertEquals("number stream codecs " + str, inputDeployInfo.streamCodecs.size(), 1L);
                        checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo.streamCodecs, str, physicalPlan);
                    }
                } else if (pTOperator.getOperatorMeta() == meta) {
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getOperatorDeployInfo(pTOperator, meta.getName(), streamingContainerManager), meta.getMeta(addOperator.outport1));
                    String str2 = meta.getName() + " " + outputDeployInfo.portName;
                    Assert.assertEquals("number stream codecs " + str2, outputDeployInfo.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta2) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta2.getName(), streamingContainerManager), meta2.getMeta(addOperator2.inport1));
                    String str3 = meta.getName() + " " + inputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
                }
            }
        }
    }

    @Test
    public void testParallelPartitioningStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec2());
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("n2n3", addOperator2.outport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        Assert.assertEquals("number operators " + meta.getName(), 2L, physicalPlan.getOperators(meta).size());
        Assert.assertEquals("number operators " + meta2.getName(), 2L, physicalPlan.getOperators(meta2).size());
        Assert.assertEquals("number operators " + meta3.getName(), 1L, physicalPlan.getOperators(meta3).size());
        Iterator it = containers.iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    OperatorDeployInfo operatorDeployInfo = getOperatorDeployInfo(pTOperator, pTOperator.getName(), streamingContainerManager);
                    for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
                        String str = pTOperator.getName() + " " + inputDeployInfo.portName;
                        Assert.assertEquals("number stream codecs " + str, inputDeployInfo.streamCodecs.size(), 1L);
                        checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo.streamCodecs, str, physicalPlan);
                    }
                    Iterator it2 = operatorDeployInfo.outputs.iterator();
                    while (it2.hasNext()) {
                        Assert.assertEquals("number stream codecs " + (pTOperator.getName() + " " + ((OperatorDeployInfo.OutputDeployInfo) it2.next()).portName), r0.streamCodecs.size(), 0L);
                    }
                } else if (pTOperator.getOperatorMeta() == meta) {
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getOperatorDeployInfo(pTOperator, meta.getName(), streamingContainerManager), meta.getMeta(addOperator.outport1));
                    String str2 = meta.getName() + " " + outputDeployInfo.portName;
                    Assert.assertEquals("number stream codecs " + str2, outputDeployInfo.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta2) {
                    OperatorDeployInfo operatorDeployInfo2 = getOperatorDeployInfo(pTOperator, meta2.getName(), streamingContainerManager);
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(operatorDeployInfo2, meta2.getMeta(addOperator2.inport1));
                    String str3 = meta.getName() + " " + inputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo2 = getOutputDeployInfo(operatorDeployInfo2, meta2.getMeta(addOperator2.outport1));
                    String str4 = meta2.getName() + " " + outputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str4, outputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, outputDeployInfo2.streamCodecs, str4, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta3) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta3.getName(), streamingContainerManager), meta3.getMeta(addOperator3.inport1));
                    String str5 = meta3.getName() + " " + inputDeployInfo3.portName;
                    Assert.assertEquals("number stream codecs " + str5, inputDeployInfo3.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo3.streamCodecs, str5, physicalPlan);
                }
            }
        }
    }

    @Test
    public void testMultipleInputStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        TestStreamCodec testStreamCodec = new TestStreamCodec();
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        this.dag.addStream("n1n2n3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 3L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getSingleOperatorDeployInfo(addOperator, streamingContainerManager), meta.getMeta(addOperator.outport1));
        String str = meta.getName() + " " + outputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str, outputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(getSingleOperatorDeployInfo(addOperator2, streamingContainerManager), meta2.getMeta(addOperator2.inport1));
        String str2 = meta2.getName() + " " + inputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str2, inputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo.streamCodecs, str2, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getSingleOperatorDeployInfo(addOperator3, streamingContainerManager), meta3.getMeta(addOperator3.inport1));
        String str3 = meta3.getName() + " " + inputDeployInfo2.portName;
        Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
    }

    @Test
    public void testPartitioningMultipleInputStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        TestStreamCodec testStreamCodec = new TestStreamCodec();
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        this.dag.addStream("n1n2n3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 4L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        Iterator it = containers.iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    OperatorDeployInfo operatorDeployInfo = getOperatorDeployInfo(pTOperator, pTOperator.getName(), streamingContainerManager);
                    for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
                        String str = pTOperator.getName() + " " + inputDeployInfo.portName;
                        Assert.assertEquals("number stream codecs " + str, inputDeployInfo.streamCodecs.size(), 1L);
                        checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo.streamCodecs, str, physicalPlan);
                        checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo.streamCodecs, str, physicalPlan);
                    }
                    Iterator it2 = operatorDeployInfo.outputs.iterator();
                    while (it2.hasNext()) {
                        Assert.assertEquals("number stream codecs " + (pTOperator.getName() + " " + ((OperatorDeployInfo.OutputDeployInfo) it2.next()).portName), r0.streamCodecs.size(), 0L);
                    }
                } else if (pTOperator.getOperatorMeta() == meta) {
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getOperatorDeployInfo(pTOperator, meta.getName(), streamingContainerManager), meta.getMeta(addOperator.outport1));
                    String str2 = meta.getName() + " " + outputDeployInfo.portName;
                    Assert.assertEquals("number stream codecs " + str2, outputDeployInfo.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta2) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta2.getName(), streamingContainerManager), meta2.getMeta(addOperator2.inport1));
                    String str3 = meta2.getName() + " " + inputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta3) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta3.getName(), streamingContainerManager), meta3.getMeta(addOperator3.inport1));
                    String str4 = meta3.getName() + " " + inputDeployInfo3.portName;
                    Assert.assertEquals("number stream codecs " + str4, inputDeployInfo3.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo3.streamCodecs, str4, physicalPlan);
                }
            }
        }
    }

    @Test
    public void testMultipleStreamCodecs() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec2());
        this.dag.addStream("n1n2n3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 3L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getSingleOperatorDeployInfo(addOperator, streamingContainerManager), meta.getMeta(addOperator.outport1));
        String str = meta.getName() + " " + outputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str, outputDeployInfo.streamCodecs.size(), 2L);
        checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str, physicalPlan);
        checkPresentStreamCodec(meta3, addOperator3.inport1, outputDeployInfo.streamCodecs, str, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(getSingleOperatorDeployInfo(addOperator2, streamingContainerManager), meta2.getMeta(addOperator2.inport1));
        String str2 = meta2.getName() + " " + inputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str2, inputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo.streamCodecs, str2, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getSingleOperatorDeployInfo(addOperator3, streamingContainerManager), meta3.getMeta(addOperator3.inport1));
        String str3 = meta3.getName() + " " + inputDeployInfo2.portName;
        Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
        checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
    }

    @Test
    public void testPartitioningMultipleStreamCodecs() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec2());
        this.dag.addStream("n1n2n3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 4L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        Iterator it = containers.iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    OperatorDeployInfo operatorDeployInfo = getOperatorDeployInfo(pTOperator, pTOperator.getName(), streamingContainerManager);
                    Assert.assertEquals("unifier outputs " + pTOperator.getName(), 1L, pTOperator.getOutputs().size());
                    PTOperator.PTOutput pTOutput = (PTOperator.PTOutput) pTOperator.getOutputs().get(0);
                    Assert.assertEquals("unifier sinks " + pTOperator.getName(), 1L, pTOutput.sinks.size());
                    LogicalPlan.OperatorMeta operatorMeta = ((PTOperator.PTInput) pTOutput.sinks.get(0)).target.getOperatorMeta();
                    Operator.InputPort<?> inputPort = null;
                    if (operatorMeta == meta2) {
                        inputPort = addOperator2.inport1;
                    } else if (operatorMeta == meta3) {
                        inputPort = addOperator3.inport1;
                    }
                    for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
                        String str = pTOperator.getName() + " " + inputDeployInfo.portName;
                        Assert.assertEquals("number stream codecs " + str, inputDeployInfo.streamCodecs.size(), 1L);
                        checkPresentStreamCodec(operatorMeta, inputPort, inputDeployInfo.streamCodecs, str, physicalPlan);
                    }
                } else if (pTOperator.getOperatorMeta() == meta) {
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getOperatorDeployInfo(pTOperator, meta.getName(), streamingContainerManager), meta.getMeta(addOperator.outport1));
                    String str2 = meta.getName() + " " + outputDeployInfo.portName;
                    Assert.assertEquals("number stream codecs " + str2, outputDeployInfo.streamCodecs.size(), 2L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta2) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta2.getName(), streamingContainerManager), meta2.getMeta(addOperator2.inport1));
                    String str3 = meta2.getName() + " " + inputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta3) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta3.getName(), streamingContainerManager), meta3.getMeta(addOperator3.inport1));
                    String str4 = meta3.getName() + " " + inputDeployInfo3.portName;
                    Assert.assertEquals("number stream codecs " + str4, inputDeployInfo3.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo3.streamCodecs, str4, physicalPlan);
                }
            }
        }
    }

    @Test
    public void testMxNMultipleStreamCodecs() {
        GenericTestOperator genericTestOperator = (GenericTestOperator) this.dag.addOperator("node1", GenericTestOperator.class);
        this.dag.setOperatorAttribute(genericTestOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        GenericTestOperator genericTestOperator2 = (GenericTestOperator) this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setOperatorAttribute(genericTestOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setInputPortAttribute(genericTestOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        GenericTestOperator genericTestOperator3 = (GenericTestOperator) this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setOperatorAttribute(genericTestOperator3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setInputPortAttribute(genericTestOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2n3", genericTestOperator.outport1, genericTestOperator2.inport1, genericTestOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        List containers = streamingContainerManager.getPhysicalPlan().getContainers();
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(genericTestOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(genericTestOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(genericTestOperator3);
        Assert.assertEquals("number operators " + meta.getName(), 2L, r0.getOperators(meta).size());
        Assert.assertEquals("number operators " + meta2.getName(), 3L, r0.getOperators(meta2).size());
        Assert.assertEquals("number operators " + meta3.getName(), 3L, r0.getOperators(meta3).size());
        checkMxNStreamCodecs(genericTestOperator, genericTestOperator2, genericTestOperator3, streamingContainerManager);
    }

    private void checkMxNStreamCodecs(GenericTestOperator genericTestOperator, GenericTestOperator genericTestOperator2, GenericTestOperator genericTestOperator3, StreamingContainerManager streamingContainerManager) {
        LogicalPlan logicalPlan = streamingContainerManager.getLogicalPlan();
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(genericTestOperator);
        LogicalPlan.OperatorMeta meta2 = logicalPlan.getMeta(genericTestOperator2);
        LogicalPlan.OperatorMeta meta3 = logicalPlan.getMeta(genericTestOperator3);
        Iterator it = containers.iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    OperatorDeployInfo operatorDeployInfo = getOperatorDeployInfo(pTOperator, pTOperator.getName(), streamingContainerManager);
                    Assert.assertEquals("unifier outputs " + pTOperator.getName(), 1L, pTOperator.getOutputs().size());
                    PTOperator.PTOutput pTOutput = (PTOperator.PTOutput) pTOperator.getOutputs().get(0);
                    Assert.assertEquals("unifier sinks " + pTOperator.getName(), 1L, pTOutput.sinks.size());
                    LogicalPlan.OperatorMeta operatorMeta = ((PTOperator.PTInput) pTOutput.sinks.get(0)).target.getOperatorMeta();
                    Operator.InputPort<?> inputPort = null;
                    if (operatorMeta == meta2) {
                        inputPort = genericTestOperator2.inport1;
                    } else if (operatorMeta == meta3) {
                        inputPort = genericTestOperator3.inport1;
                    }
                    for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
                        String str = pTOperator.getName() + " " + inputDeployInfo.portName;
                        Assert.assertEquals("number stream codecs " + str, inputDeployInfo.streamCodecs.size(), 1L);
                        checkPresentStreamCodec(operatorMeta, inputPort, inputDeployInfo.streamCodecs, str, physicalPlan);
                    }
                } else if (pTOperator.getOperatorMeta() == meta) {
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getOperatorDeployInfo(pTOperator, meta.getName(), streamingContainerManager), meta.getMeta(genericTestOperator.outport1));
                    String str2 = meta.getName() + " " + outputDeployInfo.portName;
                    Assert.assertEquals("number stream codecs " + str2, outputDeployInfo.streamCodecs.size(), 2L);
                    checkPresentStreamCodec(meta2, genericTestOperator2.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                    checkPresentStreamCodec(meta3, genericTestOperator3.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta2) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta2.getName(), streamingContainerManager), meta2.getMeta(genericTestOperator2.inport1));
                    String str3 = meta2.getName() + " " + inputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, genericTestOperator2.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta3) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta3.getName(), streamingContainerManager), meta3.getMeta(genericTestOperator3.inport1));
                    String str4 = meta3.getName() + " " + inputDeployInfo3.portName;
                    Assert.assertEquals("number stream codecs " + str4, inputDeployInfo3.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta3, genericTestOperator3.inport1, inputDeployInfo3.streamCodecs, str4, physicalPlan);
                }
            }
        }
    }

    @Test
    public void testInlineStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        Operator operator = (GenericTestOperator) this.dag.addOperator("node2", GenericTestOperator.class);
        Operator operator2 = (GenericTestOperator) this.dag.addOperator("node3", GenericTestOperator.class);
        TestStreamCodec testStreamCodec = new TestStreamCodec();
        this.dag.setInputPortAttribute(operator.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        this.dag.setInputPortAttribute(operator2.inport1, Context.PortContext.STREAM_CODEC, testStreamCodec);
        this.dag.addStream("n1n2n3", addOperator.outport1, operator.inport1, operator2.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 2L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta operatorMeta = null;
        int i2 = 0;
        while (true) {
            if (i2 >= containers.size()) {
                break;
            }
            List operators = ((PTContainer) containers.get(i2)).getOperators();
            if (operators.size() == 1) {
                operatorMeta = ((PTOperator) operators.get(0)).getOperatorMeta();
                break;
            }
            i2++;
        }
        Assert.assertNotNull("non inline operator meta is null", operatorMeta);
        Operator operator3 = null;
        Operator.InputPort<?> inputPort = null;
        if (operatorMeta.getName().equals("node2")) {
            operator3 = operator;
            inputPort = operator.inport1;
        } else if (operatorMeta.getName().equals("node3")) {
            operator3 = operator2;
            inputPort = operator2.inport1;
        }
        Assert.assertNotNull("non inline operator is null", operator3);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getSingleOperatorDeployInfo(addOperator, streamingContainerManager), meta.getMeta(addOperator.outport1));
        String str = meta.getName() + " " + outputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str, outputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(operatorMeta, inputPort, outputDeployInfo.streamCodecs, str, physicalPlan);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(getSingleOperatorDeployInfo(operator3, streamingContainerManager), operatorMeta.getMeta(inputPort));
        String str2 = operatorMeta.getName() + " " + inputDeployInfo.portName;
        Assert.assertEquals("number stream codecs " + str2, inputDeployInfo.streamCodecs.size(), 1L);
        checkPresentStreamCodec(operatorMeta, inputPort, inputDeployInfo.streamCodecs, str2, physicalPlan);
    }

    @Test
    public void testCascadingStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setOutputPortAttribute(addOperator.outport1, Context.PortContext.UNIFIER_LIMIT, 2);
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec2());
        this.dag.addStream("n1n2n3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 7L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        Iterator it = containers.iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    OperatorDeployInfo operatorDeployInfo = getOperatorDeployInfo(pTOperator, pTOperator.getName(), streamingContainerManager);
                    Assert.assertEquals("unifier outputs " + pTOperator.getName(), 1L, pTOperator.getOutputs().size());
                    PTOperator.PTOutput pTOutput = (PTOperator.PTOutput) pTOperator.getOutputs().get(0);
                    Assert.assertEquals("unifier sinks " + pTOperator.getName(), 1L, pTOutput.sinks.size());
                    LogicalPlan.OperatorMeta operatorMeta = StreamingContainerAgent.getIdentifyingInputPortMeta((PTOperator.PTInput) pTOutput.sinks.get(0)).getOperatorMeta();
                    Operator.InputPort<?> inputPort = null;
                    if (operatorMeta == meta2) {
                        inputPort = addOperator2.inport1;
                    } else if (operatorMeta == meta3) {
                        inputPort = addOperator3.inport1;
                    }
                    for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
                        String str = pTOperator.getName() + " " + inputDeployInfo.portName;
                        Assert.assertEquals("number stream codecs " + str, inputDeployInfo.streamCodecs.size(), 1L);
                        checkPresentStreamCodec(operatorMeta, inputPort, inputDeployInfo.streamCodecs, str, physicalPlan);
                    }
                } else if (pTOperator.getOperatorMeta() == meta) {
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = getOutputDeployInfo(getOperatorDeployInfo(pTOperator, meta.getName(), streamingContainerManager), meta.getMeta(addOperator.outport1));
                    String str2 = meta.getName() + " " + outputDeployInfo.portName;
                    Assert.assertEquals("number stream codecs " + str2, outputDeployInfo.streamCodecs.size(), 2L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, outputDeployInfo.streamCodecs, str2, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta2) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta2.getName(), streamingContainerManager), meta2.getMeta(addOperator2.inport1));
                    String str3 = meta2.getName() + " " + inputDeployInfo2.portName;
                    Assert.assertEquals("number stream codecs " + str3, inputDeployInfo2.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta2, addOperator2.inport1, inputDeployInfo2.streamCodecs, str3, physicalPlan);
                } else if (pTOperator.getOperatorMeta() == meta3) {
                    OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = getInputDeployInfo(getOperatorDeployInfo(pTOperator, meta3.getName(), streamingContainerManager), meta3.getMeta(addOperator3.inport1));
                    String str4 = meta3.getName() + " " + inputDeployInfo3.portName;
                    Assert.assertEquals("number stream codecs " + str4, inputDeployInfo3.streamCodecs.size(), 1L);
                    checkPresentStreamCodec(meta3, addOperator3.inport1, inputDeployInfo3.streamCodecs, str4, physicalPlan);
                }
            }
        }
    }

    @Test
    public void testDynamicPartitioningStreamCodec() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        this.dag.setOperatorAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setOperatorAttribute(addOperator2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        this.dag.setInputPortAttribute(addOperator2.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setOperatorAttribute(addOperator3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        this.dag.setOperatorAttribute(addOperator3, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());
        this.dag.addStream("n1n2n3", addOperator.outport1, addOperator2.inport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        int i = 0;
        for (int i2 = 0; i2 < containers.size(); i2++) {
            i++;
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + i);
        }
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        Assert.assertEquals("number operators " + meta.getName(), 2L, physicalPlan.getOperators(meta).size());
        Assert.assertEquals("number operators " + meta2.getName(), 3L, physicalPlan.getOperators(meta2).size());
        Assert.assertEquals("number operators " + meta3.getName(), 3L, physicalPlan.getOperators(meta3).size());
        for (int i3 = 0; i3 < 2; i3++) {
            markAllOperatorsActive(physicalPlan);
            for (PTOperator pTOperator : physicalPlan.getOperators(meta2)) {
                PartitioningTest.PartitionLoadWatch.put(pTOperator, -1);
                physicalPlan.onStatusUpdate(pTOperator);
            }
            streamingContainerManager.processEvents();
            i = assignNewContainers(streamingContainerManager, i);
            List operators = physicalPlan.getOperators(meta2);
            ArrayList arrayList = new ArrayList();
            Iterator it = operators.iterator();
            while (it.hasNext()) {
                arrayList.addAll(((PTOperator) it.next()).upstreamMerge.values());
            }
            Assert.assertEquals("Number of unifiers ", 2 - i3, arrayList.size());
        }
        for (int i4 = 0; i4 < 2; i4++) {
            markAllOperatorsActive(physicalPlan);
            for (PTOperator pTOperator2 : physicalPlan.getOperators(meta3)) {
                PartitioningTest.PartitionLoadWatch.put(pTOperator2, -1);
                physicalPlan.onStatusUpdate(pTOperator2);
            }
            streamingContainerManager.processEvents();
            i = assignNewContainers(streamingContainerManager, i);
            List operators2 = physicalPlan.getOperators(meta3);
            ArrayList arrayList2 = new ArrayList();
            Iterator it2 = operators2.iterator();
            while (it2.hasNext()) {
                arrayList2.addAll(((PTOperator) it2.next()).upstreamMerge.values());
            }
            Assert.assertEquals("Number of unifiers ", 2 - i4, arrayList2.size());
        }
        Assert.assertEquals("Number of unifiers ", 2L, getUnifiers(physicalPlan).size());
        for (int i5 = 0; i5 < 2; i5++) {
            markAllOperatorsActive(physicalPlan);
            PTOperator pTOperator3 = (PTOperator) physicalPlan.getOperators(meta2).get(0);
            PartitioningTest.PartitionLoadWatch.put(pTOperator3, 1);
            physicalPlan.onStatusUpdate(pTOperator3);
            streamingContainerManager.processEvents();
            i = assignNewContainers(streamingContainerManager, i);
            List operators3 = physicalPlan.getOperators(meta2);
            ArrayList arrayList3 = new ArrayList();
            Iterator it3 = operators3.iterator();
            while (it3.hasNext()) {
                arrayList3.addAll(((PTOperator) it3.next()).upstreamMerge.values());
            }
            Assert.assertEquals("Number of unifiers ", 2 + i5, arrayList3.size());
        }
        markAllOperatorsActive(physicalPlan);
        for (PTOperator pTOperator4 : physicalPlan.getOperators(meta)) {
            PartitioningTest.PartitionLoadWatch.put(pTOperator4, -1);
            physicalPlan.onStatusUpdate(pTOperator4);
        }
        streamingContainerManager.processEvents();
        int assignNewContainers = assignNewContainers(streamingContainerManager, i);
        Assert.assertEquals("Number of unifiers", 0L, getUnifiers(physicalPlan).size());
        markAllOperatorsActive(physicalPlan);
        for (PTOperator pTOperator5 : physicalPlan.getOperators(meta)) {
            PartitioningTest.PartitionLoadWatch.put(pTOperator5, 1);
            physicalPlan.onStatusUpdate(pTOperator5);
        }
        streamingContainerManager.processEvents();
        assignNewContainers(streamingContainerManager, assignNewContainers);
        Assert.assertEquals("Number of unifiers", 4L, getUnifiers(physicalPlan).size());
    }

    private int assignNewContainers(StreamingContainerManager streamingContainerManager, int i) {
        int i2 = 0;
        Iterator it = streamingContainerManager.getPhysicalPlan().getContainers().iterator();
        while (it.hasNext()) {
            if (((PTContainer) it.next()).getState() == PTContainer.State.NEW) {
                i2++;
            }
        }
        for (int i3 = 0; i3 < i2; i3++) {
            i++;
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + i);
        }
        return i;
    }

    private void markAllOperatorsActive(PhysicalPlan physicalPlan) {
        Iterator it = physicalPlan.getContainers().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((PTContainer) it.next()).getOperators().iterator();
            while (it2.hasNext()) {
                ((PTOperator) it2.next()).setState(PTOperator.State.ACTIVE);
            }
        }
    }

    private Set<PTOperator> getUnifiers(PhysicalPlan physicalPlan) {
        HashSet hashSet = new HashSet();
        Iterator it = physicalPlan.getContainers().iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    hashSet.add(pTOperator);
                }
            }
        }
        return hashSet;
    }

    private void checkPresentStreamCodec(LogicalPlan.OperatorMeta operatorMeta, Operator.InputPort<?> inputPort, Map<Integer, StreamCodec<?>> map, String str, PhysicalPlan physicalPlan) {
        StreamCodec<?> streamCodec = operatorMeta.getMeta(inputPort).getStreamCodec();
        Assert.assertTrue("stream codec identifier not present" + str, isStrCodecPresent(streamCodec, physicalPlan));
        checkPresentStreamCodecInfo(map, str, physicalPlan.getStreamCodecIdentifier(streamCodec), streamCodec);
    }

    private void checkPresentStreamCodecInfo(Map<Integer, StreamCodec<?>> map, String str, Integer num, StreamCodec<?> streamCodec) {
        StreamCodec<?> streamCodec2 = map.get(num);
        Assert.assertNotNull("stream codec info null " + str, streamCodec2);
        Assert.assertEquals("stream codec not same " + str, streamCodec2, streamCodec);
    }

    private OperatorDeployInfo getSingleOperatorDeployInfo(Operator operator, StreamingContainerManager streamingContainerManager) {
        LogicalPlan logicalPlan = streamingContainerManager.getLogicalPlan();
        String operatorMeta = logicalPlan.getMeta(operator).toString();
        List operators = streamingContainerManager.getPhysicalPlan().getOperators(logicalPlan.getMeta(operator));
        Assert.assertEquals("number of operators " + operatorMeta, 1L, operators.size());
        return getOperatorDeployInfo((PTOperator) operators.get(0), operatorMeta, streamingContainerManager);
    }

    private OperatorDeployInfo getOperatorDeployInfo(PTOperator pTOperator, String str, StreamingContainerManager streamingContainerManager) {
        String externalId = pTOperator.getContainer().getExternalId();
        OperatorDeployInfo operatorDeployInfo = null;
        Iterator<OperatorDeployInfo> it = StreamingContainerManagerTest.getDeployInfo(streamingContainerManager.getContainerAgent(externalId)).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OperatorDeployInfo next = it.next();
            if (next.id == pTOperator.getId()) {
                operatorDeployInfo = next;
                break;
            }
        }
        Assert.assertNotNull(str + " assigned to " + externalId + " deploy info is null", operatorDeployInfo);
        return operatorDeployInfo;
    }

    private OperatorDeployInfo.InputDeployInfo getInputDeployInfo(OperatorDeployInfo operatorDeployInfo, LogicalPlan.InputPortMeta inputPortMeta) {
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = null;
        Iterator it = operatorDeployInfo.inputs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = (OperatorDeployInfo.InputDeployInfo) it.next();
            if (inputDeployInfo2.portName.equals(inputPortMeta.getPortName())) {
                inputDeployInfo = inputDeployInfo2;
                break;
            }
        }
        Assert.assertNotNull("input deploy info " + inputPortMeta.getPortName(), inputDeployInfo);
        return inputDeployInfo;
    }

    private OperatorDeployInfo.OutputDeployInfo getOutputDeployInfo(OperatorDeployInfo operatorDeployInfo, LogicalPlan.OutputPortMeta outputPortMeta) {
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = null;
        Iterator it = operatorDeployInfo.outputs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OperatorDeployInfo.OutputDeployInfo outputDeployInfo2 = (OperatorDeployInfo.OutputDeployInfo) it.next();
            if (outputDeployInfo2.portName.equals(outputPortMeta.getPortName())) {
                outputDeployInfo = outputDeployInfo2;
                break;
            }
        }
        Assert.assertNotNull("output deploy info " + outputPortMeta.getPortName(), outputDeployInfo);
        return outputDeployInfo;
    }

    public boolean isStrCodecPresent(StreamCodec<?> streamCodec, PhysicalPlan physicalPlan) {
        return physicalPlan.getStreamCodecIdentifiers().containsKey(streamCodec);
    }
}
