package com.datatorrent.stram.plan.logical.module;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Module;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion.class */
public class TestModuleExpansion {

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$DummyInputOperator.class */
    public static class DummyInputOperator extends BaseOperator implements InputOperator {
        private int inputOperatorProp = 0;
        Random r = new Random();
        public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<>();

        public void emitTuples() {
            this.out.emit(Integer.valueOf(this.r.nextInt()));
        }

        public int getInputOperatorProp() {
            return this.inputOperatorProp;
        }

        public void setInputOperatorProp(int i) {
            this.inputOperatorProp = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$DummyOperator.class */
    public static class DummyOperator extends BaseOperator {
        private int operatorProp = 0;

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Integer> out1 = new DefaultOutputPort<>();

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Integer> out2 = new DefaultOutputPort<>();

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() { // from class: com.datatorrent.stram.plan.logical.module.TestModuleExpansion.DummyOperator.1
            public void process(Integer num) {
                DummyOperator.this.out1.emit(num);
                DummyOperator.this.out2.emit(num);
            }
        };

        public int getOperatorProp() {
            return this.operatorProp;
        }

        public void setOperatorProp(int i) {
            this.operatorProp = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$Level1Module.class */
    public static class Level1Module implements Module {
        private int level1ModuleProp = 0;

        @InputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyInputPort<Integer> mIn = new Module.ProxyInputPort<>();

        @OutputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyOutputPort<Integer> mOut = new Module.ProxyOutputPort<>();
        private int memory = 512;
        private int portMemory = 2;

        public void populateDAG(DAG dag, Configuration configuration) {
            DummyOperator addOperator = dag.addOperator("O1", new DummyOperator());
            addOperator.setOperatorProp(this.level1ModuleProp);
            Attribute.AttributeMap attributes = dag.getMeta(addOperator).getAttributes();
            attributes.put(OperatorContext.MEMORY_MB, Integer.valueOf(this.memory));
            attributes.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
            attributes.put(OperatorContext.LOCALITY_HOST, "host1");
            attributes.put(OperatorContext.PARTITIONER, new TestPartitioner());
            attributes.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 120);
            attributes.put(OperatorContext.STATELESS, true);
            attributes.put(OperatorContext.SPIN_MILLIS, 20);
            dag.setInputPortAttribute(addOperator.in, Context.PortContext.BUFFER_MEMORY_MB, Integer.valueOf(this.portMemory));
            this.mIn.set(addOperator.in);
            this.mOut.set(addOperator.out1);
        }

        public int getLevel1ModuleProp() {
            return this.level1ModuleProp;
        }

        public void setLevel1ModuleProp(int i) {
            this.level1ModuleProp = i;
        }

        public int getMemory() {
            return this.memory;
        }

        public void setMemory(int i) {
            this.memory = i;
        }

        public int getPortMemory() {
            return this.portMemory;
        }

        public void setPortMemory(int i) {
            this.portMemory = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$Level2ModuleA.class */
    public static class Level2ModuleA implements Module {
        private int level2ModuleAProp1 = 0;
        private int level2ModuleAProp2 = 0;
        private int level2ModuleAProp3 = 0;

        @InputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyInputPort<Integer> mIn = new Module.ProxyInputPort<>();

        @OutputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyOutputPort<Integer> mOut1 = new Module.ProxyOutputPort<>();

        @OutputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyOutputPort<Integer> mOut2 = new Module.ProxyOutputPort<>();

        public void populateDAG(DAG dag, Configuration configuration) {
            Level1Module level1Module = (Level1Module) dag.addModule("M1", new Level1Module());
            level1Module.setMemory(1024);
            level1Module.setPortMemory(1);
            level1Module.setLevel1ModuleProp(this.level2ModuleAProp1);
            Level1Module level1Module2 = (Level1Module) dag.addModule("M2", new Level1Module());
            level1Module2.setMemory(2048);
            level1Module2.setPortMemory(2);
            level1Module2.setLevel1ModuleProp(this.level2ModuleAProp2);
            DummyOperator addOperator = dag.addOperator("O1", new DummyOperator());
            addOperator.setOperatorProp(this.level2ModuleAProp3);
            dag.addStream("M1_M2&O1", level1Module.mOut, level1Module2.mIn, addOperator.in).setLocality(DAG.Locality.CONTAINER_LOCAL);
            this.mIn.set(level1Module.mIn);
            this.mOut1.set(level1Module2.mOut);
            this.mOut2.set(addOperator.out1);
        }

        public int getLevel2ModuleAProp1() {
            return this.level2ModuleAProp1;
        }

        public void setLevel2ModuleAProp1(int i) {
            this.level2ModuleAProp1 = i;
        }

        public int getLevel2ModuleAProp2() {
            return this.level2ModuleAProp2;
        }

        public void setLevel2ModuleAProp2(int i) {
            this.level2ModuleAProp2 = i;
        }

        public int getLevel2ModuleAProp3() {
            return this.level2ModuleAProp3;
        }

        public void setLevel2ModuleAProp3(int i) {
            this.level2ModuleAProp3 = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$Level2ModuleB.class */
    public static class Level2ModuleB implements Module {
        private int level2ModuleBProp1 = 0;
        private int level2ModuleBProp2 = 0;
        private int level2ModuleBProp3 = 0;

        @InputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyInputPort<Integer> mIn = new Module.ProxyInputPort<>();

        @OutputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyOutputPort<Integer> mOut1 = new Module.ProxyOutputPort<>();

        @OutputPortFieldAnnotation(optional = true)
        public final transient Module.ProxyOutputPort<Integer> mOut2 = new Module.ProxyOutputPort<>();

        public void populateDAG(DAG dag, Configuration configuration) {
            DummyOperator addOperator = dag.addOperator("O1", new DummyOperator());
            addOperator.setOperatorProp(this.level2ModuleBProp1);
            Level1Module level1Module = (Level1Module) dag.addModule("M1", new Level1Module());
            level1Module.setMemory(4096);
            level1Module.setPortMemory(3);
            level1Module.setLevel1ModuleProp(this.level2ModuleBProp2);
            DummyOperator addOperator2 = dag.addOperator("O2", new DummyOperator());
            addOperator2.setOperatorProp(this.level2ModuleBProp3);
            dag.addStream("O1_M1", addOperator.out1, level1Module.mIn).setLocality(DAG.Locality.THREAD_LOCAL);
            dag.addStream("O1_O2", addOperator.out2, addOperator2.in).setLocality(DAG.Locality.RACK_LOCAL);
            this.mIn.set(addOperator.in);
            this.mOut1.set(level1Module.mOut);
            this.mOut2.set(addOperator2.out1);
        }

        public int getLevel2ModuleBProp1() {
            return this.level2ModuleBProp1;
        }

        public void setLevel2ModuleBProp1(int i) {
            this.level2ModuleBProp1 = i;
        }

        public int getLevel2ModuleBProp2() {
            return this.level2ModuleBProp2;
        }

        public void setLevel2ModuleBProp2(int i) {
            this.level2ModuleBProp2 = i;
        }

        public int getLevel2ModuleBProp3() {
            return this.level2ModuleBProp3;
        }

        public void setLevel2ModuleBProp3(int i) {
            this.level2ModuleBProp3 = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$Level3Module.class */
    public static class Level3Module implements Module {
        public final transient Module.ProxyInputPort<Integer> mIn = new Module.ProxyInputPort<>();
        public final transient Module.ProxyOutputPort<Integer> mOut1 = new Module.ProxyOutputPort<>();
        public final transient Module.ProxyOutputPort<Integer> mOut2 = new Module.ProxyOutputPort<>();

        public void populateDAG(DAG dag, Configuration configuration) {
            DummyOperator addOperator = dag.addOperator("O1", new DummyOperator());
            Level2ModuleB level2ModuleB = (Level2ModuleB) dag.addModule("M1", new Level2ModuleB());
            Level1Module level1Module = (Level1Module) dag.addModule("M2", new Level1Module());
            dag.addStream("s1", addOperator.out1, level2ModuleB.mIn);
            dag.addStream("s2", addOperator.out2, level1Module.mIn);
            this.mIn.set(addOperator.in);
            this.mOut1.set(level2ModuleB.mOut1);
            this.mOut2.set(level1Module.mOut);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$NestedModuleApp.class */
    public static class NestedModuleApp implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            DummyInputOperator addOperator = dag.addOperator("O1", new DummyInputOperator());
            addOperator.setInputOperatorProp(1);
            DummyOperator addOperator2 = dag.addOperator("O2", new DummyOperator());
            addOperator2.setOperatorProp(2);
            Level2ModuleA level2ModuleA = (Level2ModuleA) dag.addModule("Ma", new Level2ModuleA());
            level2ModuleA.setLevel2ModuleAProp1(11);
            level2ModuleA.setLevel2ModuleAProp2(12);
            level2ModuleA.setLevel2ModuleAProp3(13);
            Level2ModuleB level2ModuleB = (Level2ModuleB) dag.addModule("Mb", new Level2ModuleB());
            level2ModuleB.setLevel2ModuleBProp1(21);
            level2ModuleB.setLevel2ModuleBProp2(22);
            level2ModuleB.setLevel2ModuleBProp3(23);
            Level2ModuleA level2ModuleA2 = (Level2ModuleA) dag.addModule("Mc", new Level2ModuleA());
            level2ModuleA2.setLevel2ModuleAProp1(31);
            level2ModuleA2.setLevel2ModuleAProp2(32);
            level2ModuleA2.setLevel2ModuleAProp3(33);
            Level2ModuleB level2ModuleB2 = (Level2ModuleB) dag.addModule("Md", new Level2ModuleB());
            level2ModuleB2.setLevel2ModuleBProp1(41);
            level2ModuleB2.setLevel2ModuleBProp2(42);
            level2ModuleB2.setLevel2ModuleBProp3(43);
            dag.addStream("O1_O2", addOperator.out, addOperator2.in, ((Level3Module) dag.addModule("Me", new Level3Module())).mIn);
            dag.addStream("O2_Ma", addOperator2.out1, level2ModuleA.mIn);
            dag.addStream("Ma_Mb", level2ModuleA.mOut1, level2ModuleB.mIn);
            dag.addStream("Ma_Md", level2ModuleA.mOut2, level2ModuleB2.mIn);
            dag.addStream("Mb_Mc", level2ModuleB.mOut2, level2ModuleA2.mIn);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/module/TestModuleExpansion$TestPartitioner.class */
    public static class TestPartitioner implements Partitioner<DummyOperator>, Serializable {
        public Collection<Partitioner.Partition<DummyOperator>> definePartitions(Collection<Partitioner.Partition<DummyOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(collection.iterator().next());
            return arrayList;
        }

        public void partitioned(Map<Integer, Partitioner.Partition<DummyOperator>> map) {
        }
    }

    @Test
    public void testModuleExtreme() {
        NestedModuleApp nestedModuleApp = new NestedModuleApp();
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, nestedModuleApp, "ModuleApp");
        logicalPlan.validate();
        validateTopLevelOperators(logicalPlan);
        validateTopLevelStreams(logicalPlan);
        validatePublicMethods(logicalPlan);
    }

    private void validateTopLevelStreams(LogicalPlan logicalPlan) {
        ArrayList arrayList = new ArrayList();
        Iterator it = logicalPlan.getAllStreams().iterator();
        while (it.hasNext()) {
            arrayList.add(((LogicalPlan.StreamMeta) it.next()).getName());
        }
        Assert.assertTrue(arrayList.contains(componentName("Mb", "O1_M1")));
        Assert.assertTrue(arrayList.contains("O2_Ma"));
        Assert.assertTrue(arrayList.contains("Mb_Mc"));
        Assert.assertTrue(arrayList.contains(componentName("Mb", "O1_O2")));
        Assert.assertTrue(arrayList.contains(componentName("Ma", "M1_M2&O1")));
        Assert.assertTrue(arrayList.contains(componentName("Md", "O1_M1")));
        Assert.assertTrue(arrayList.contains(componentName("Ma_Md")));
        Assert.assertTrue(arrayList.contains(componentName("Mc", "M1_M2&O1")));
        Assert.assertTrue(arrayList.contains(componentName("Md", "O1_O2")));
        Assert.assertTrue(arrayList.contains("Ma_Mb"));
        Assert.assertTrue(arrayList.contains("O1_O2"));
        validateSeperateStream(logicalPlan, componentName("Mb", "O1_M1"), componentName("Mb", "O1"), componentName("Mb", "M1", "O1"));
        validateSeperateStream(logicalPlan, "O2_Ma", "O2", componentName("Ma", "M1", "O1"));
        validateSeperateStream(logicalPlan, "Mb_Mc", componentName("Mb", "O2"), componentName("Mc", "M1", "O1"));
        validateSeperateStream(logicalPlan, componentName("Mb", "O1_O2"), componentName("Mb", "O1"), componentName("Mb", "O2"));
        validateSeperateStream(logicalPlan, componentName("Ma", "M1_M2&O1"), componentName("Ma", "M1", "O1"), componentName("Ma", "O1"), componentName("Ma", "M2", "O1"));
        validateSeperateStream(logicalPlan, componentName("Md", "O1_M1"), componentName("Md", "O1"), componentName("Md", "M1", "O1"));
        validateSeperateStream(logicalPlan, "Ma_Md", componentName("Ma", "O1"), componentName("Md", "O1"));
        validateSeperateStream(logicalPlan, componentName("Mc", "M1_M2&O1"), componentName("Mc", "M1", "O1"), componentName("Mc", "O1"), componentName("Mc", "M2", "O1"));
        validateSeperateStream(logicalPlan, componentName("Md", "O1_O2"), componentName("Md", "O1"), componentName("Md", "O2"));
        validateSeperateStream(logicalPlan, "Ma_Mb", componentName("Ma", "M2", "O1"), componentName("Mb", "O1"));
        validateSeperateStream(logicalPlan, "O1_O2", "O1", "O2", componentName("Me", "O1"));
        validateStreamLocality(logicalPlan, componentName("Mc", "M1_M2&O1"), DAG.Locality.CONTAINER_LOCAL);
        validateStreamLocality(logicalPlan, componentName("Mb", "O1_M1"), DAG.Locality.THREAD_LOCAL);
        validateStreamLocality(logicalPlan, componentName("Mb", "O1_O2"), DAG.Locality.RACK_LOCAL);
        validateStreamLocality(logicalPlan, componentName("Mc", "M1_M2&O1"), DAG.Locality.CONTAINER_LOCAL);
        validateStreamLocality(logicalPlan, componentName("Md", "O1_M1"), DAG.Locality.THREAD_LOCAL);
        validateStreamLocality(logicalPlan, componentName("Me", "s1"), null);
    }

    private void validateSeperateStream(LogicalPlan logicalPlan, String str, String str2, String... strArr) {
        LogicalPlan.StreamMeta stream = logicalPlan.getStream(str);
        String name = stream.getSource().getOperatorMeta().getName();
        ArrayList arrayList = new ArrayList();
        Iterator it = stream.getSinks().iterator();
        while (it.hasNext()) {
            arrayList.add(((LogicalPlan.InputPortMeta) it.next()).getOperatorMeta().getName());
        }
        Assert.assertTrue(str2.equals(name));
        Assert.assertEquals(strArr.length, arrayList.size());
        for (String str3 : strArr) {
            Assert.assertTrue(arrayList.contains(str3));
        }
    }

    private void validateTopLevelOperators(LogicalPlan logicalPlan) {
        ArrayList arrayList = new ArrayList();
        Iterator it = logicalPlan.getAllOperators().iterator();
        while (it.hasNext()) {
            arrayList.add(((LogicalPlan.OperatorMeta) it.next()).getName());
        }
        Assert.assertTrue(arrayList.contains("O1"));
        Assert.assertTrue(arrayList.contains("O2"));
        Assert.assertTrue(arrayList.contains(componentName("Ma", "M1", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Ma", "M2", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Ma", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Mb", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Mb", "M1", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Mb", "O2")));
        Assert.assertTrue(arrayList.contains(componentName("Mc", "M1", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Mc", "M2", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Mc", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Md", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Md", "M1", "O1")));
        Assert.assertTrue(arrayList.contains(componentName("Md", "O2")));
        validateOperatorPropertyValue(logicalPlan, "O1", 1);
        validateOperatorPropertyValue(logicalPlan, "O2", 2);
        validateOperatorPropertyValue(logicalPlan, componentName("Ma", "M1", "O1"), 11);
        validateOperatorPropertyValue(logicalPlan, componentName("Ma", "M2", "O1"), 12);
        validateOperatorPropertyValue(logicalPlan, componentName("Ma", "O1"), 13);
        validateOperatorPropertyValue(logicalPlan, componentName("Mb", "O1"), 21);
        validateOperatorPropertyValue(logicalPlan, componentName("Mb", "M1", "O1"), 22);
        validateOperatorPropertyValue(logicalPlan, componentName("Mb", "O2"), 23);
        validateOperatorPropertyValue(logicalPlan, componentName("Mc", "M1", "O1"), 31);
        validateOperatorPropertyValue(logicalPlan, componentName("Mc", "M2", "O1"), 32);
        validateOperatorPropertyValue(logicalPlan, componentName("Mc", "O1"), 33);
        validateOperatorPropertyValue(logicalPlan, componentName("Md", "O1"), 41);
        validateOperatorPropertyValue(logicalPlan, componentName("Md", "M1", "O1"), 42);
        validateOperatorPropertyValue(logicalPlan, componentName("Md", "O2"), 43);
        validateOperatorParent(logicalPlan, "O1", null);
        validateOperatorParent(logicalPlan, "O2", null);
        validateOperatorParent(logicalPlan, componentName("Ma", "M1", "O1"), componentName("Ma", "M1"));
        validateOperatorParent(logicalPlan, componentName("Ma", "M2", "O1"), componentName("Ma", "M2"));
        validateOperatorParent(logicalPlan, componentName("Ma", "O1"), "Ma");
        validateOperatorParent(logicalPlan, componentName("Mb", "O1"), "Mb");
        validateOperatorParent(logicalPlan, componentName("Mb", "M1", "O1"), componentName("Mb", "M1"));
        validateOperatorParent(logicalPlan, componentName("Mb", "O2"), "Mb");
        validateOperatorParent(logicalPlan, componentName("Mc", "M1", "O1"), componentName("Mc", "M1"));
        validateOperatorParent(logicalPlan, componentName("Mc", "M2", "O1"), componentName("Mc", "M2"));
        validateOperatorParent(logicalPlan, componentName("Mc", "O1"), "Mc");
        validateOperatorParent(logicalPlan, componentName("Md", "O1"), "Md");
        validateOperatorParent(logicalPlan, componentName("Md", "M1", "O1"), componentName("Md", "M1"));
        validateOperatorParent(logicalPlan, componentName("Md", "O2"), "Md");
        validateOperatorAttribute(logicalPlan, componentName("Ma", "M1", "O1"), 1024);
        validateOperatorAttribute(logicalPlan, componentName("Ma", "M2", "O1"), 2048);
        validateOperatorAttribute(logicalPlan, componentName("Mb", "M1", "O1"), 4096);
        validateOperatorAttribute(logicalPlan, componentName("Mc", "M1", "O1"), 1024);
        validateOperatorAttribute(logicalPlan, componentName("Mc", "M2", "O1"), 2048);
        validatePortAttribute(logicalPlan, componentName("Ma", "M1", "O1"), 1);
        validatePortAttribute(logicalPlan, componentName("Ma", "M2", "O1"), 2);
        validatePortAttribute(logicalPlan, componentName("Mb", "M1", "O1"), 3);
        validatePortAttribute(logicalPlan, componentName("Mc", "M1", "O1"), 1);
        validatePortAttribute(logicalPlan, componentName("Mc", "M2", "O1"), 2);
    }

    private void validateOperatorParent(LogicalPlan logicalPlan, String str, String str2) {
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta(str);
        if (str2 == null) {
            Assert.assertNull(operatorMeta.getModuleName());
        } else {
            Assert.assertTrue(str2.equals(operatorMeta.getModuleName()));
        }
    }

    private void validateOperatorPropertyValue(LogicalPlan logicalPlan, String str, int i) {
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta(str);
        if (str.equals("O1")) {
            Assert.assertEquals(i, operatorMeta.getOperator().getInputOperatorProp());
        } else {
            Assert.assertEquals(i, operatorMeta.getOperator().getOperatorProp());
        }
    }

    private void validatePublicMethods(LogicalPlan logicalPlan) {
        ArrayList arrayList = new ArrayList();
        Iterator it = logicalPlan.getAllModules().iterator();
        while (it.hasNext()) {
            arrayList.add(((LogicalPlan.ModuleMeta) it.next()).getName());
        }
        Assert.assertTrue(arrayList.contains("Ma"));
        Assert.assertTrue(arrayList.contains("Mb"));
        Assert.assertTrue(arrayList.contains("Mc"));
        Assert.assertTrue(arrayList.contains("Md"));
        Assert.assertTrue(arrayList.contains("Me"));
        Assert.assertEquals("Number of modules are 5", 5L, logicalPlan.getAllModules().size());
    }

    private static String componentName(String... strArr) {
        if (strArr.length == 0) {
            return "";
        }
        StringBuilder sb = new StringBuilder(strArr[0]);
        for (int i = 1; i < strArr.length; i++) {
            sb.append("$");
            sb.append(strArr[i]);
        }
        return sb.toString();
    }

    @Test(expected = IllegalArgumentException.class)
    public void conflictingNamesWithExpandedModule() {
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("s1", logicalPlan.addOperator(componentName("m1", "O1"), new DummyInputOperator()).out, ((Level2ModuleA) logicalPlan.addModule("m1", new Level2ModuleA())).mIn);
        logicalPlanConfiguration.prepareDAG(logicalPlan, (StreamingApplication) null, "ModuleApp");
        logicalPlan.validate();
    }

    @Test(expected = IllegalArgumentException.class)
    public void conflictingNamesWithOperator1() {
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("s1", logicalPlan.addOperator("M1", new DummyInputOperator()).out, ((Level2ModuleA) logicalPlan.addModule("M1", new Level2ModuleA())).mIn);
        logicalPlanConfiguration.prepareDAG(logicalPlan, (StreamingApplication) null, "ModuleApp");
        logicalPlan.validate();
    }

    @Test(expected = IllegalArgumentException.class)
    public void conflictingNamesWithOperator2() {
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("s1", logicalPlan.addOperator("M1", new DummyInputOperator()).out, ((Level2ModuleA) logicalPlan.addModule("M1", new Level2ModuleA())).mIn);
        logicalPlanConfiguration.prepareDAG(logicalPlan, (StreamingApplication) null, "ModuleApp");
        logicalPlan.validate();
    }

    private void validateOperatorAttribute(LogicalPlan logicalPlan, String str, int i) {
        Attribute.AttributeMap attributes = logicalPlan.getOperatorMeta(str).getAttributes();
        Assert.assertEquals(((Integer) attributes.get(OperatorContext.MEMORY_MB)).intValue(), i);
        Assert.assertEquals("Application window id is 2 ", ((Integer) attributes.get(OperatorContext.APPLICATION_WINDOW_COUNT)).intValue(), 2L);
        Assert.assertEquals("Locality host is host1", attributes.get(OperatorContext.LOCALITY_HOST), "host1");
        Assert.assertEquals(((Partitioner) attributes.get(OperatorContext.PARTITIONER)).getClass(), TestPartitioner.class);
        Assert.assertEquals("Checkpoint window count ", ((Integer) attributes.get(OperatorContext.CHECKPOINT_WINDOW_COUNT)).intValue(), 120L);
        Assert.assertEquals("Operator is stateless ", attributes.get(OperatorContext.STATELESS), true);
        Assert.assertEquals("SPIN MILLIS is set to 20 ", ((Integer) attributes.get(OperatorContext.SPIN_MILLIS)).intValue(), 20L);
    }

    private void validatePortAttribute(LogicalPlan logicalPlan, String str, int i) {
        Assert.assertEquals(i, ((Integer) ((LogicalPlan.InputPortMeta) logicalPlan.getOperatorMeta(str).getInputStreams().keySet().iterator().next()).getAttributes().get(Context.PortContext.BUFFER_MEMORY_MB)).intValue());
    }

    private void validateStreamLocality(LogicalPlan logicalPlan, String str, DAG.Locality locality) {
        LogicalPlan.StreamMeta stream = logicalPlan.getStream(str);
        Assert.assertTrue("Metadata for stream is available ", stream != null);
        Assert.assertEquals("Locality is " + locality, stream.getLocality(), locality);
    }

    @Test
    public void testLoadFromPropertiesFile() throws IOException {
        Properties properties = new Properties();
        InputStream resourceAsStream = getClass().getResourceAsStream("/testModuleTopology.properties");
        if (resourceAsStream == null) {
            throw new RuntimeException("Could not load /testModuleTopology.properties");
        }
        properties.load(resourceAsStream);
        LogicalPlanConfiguration addFromProperties = new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        addFromProperties.populateDAG(logicalPlan);
        addFromProperties.prepareDAG(logicalPlan, (StreamingApplication) null, "testApplication");
        logicalPlan.validate();
        validateTopLevelOperators(logicalPlan);
        validateTopLevelStreams(logicalPlan);
        validatePublicMethods(logicalPlan);
    }

    @Test
    public void testLoadFromJson() throws Exception {
        InputStream resourceAsStream = getClass().getResourceAsStream("/testModuleTopology.json");
        if (resourceAsStream == null) {
            throw new RuntimeException("Could not load /testModuleTopology.json");
        }
        StringWriter stringWriter = new StringWriter();
        IOUtils.copy(resourceAsStream, stringWriter);
        JSONObject jSONObject = new JSONObject(stringWriter.toString());
        Configuration configuration = new Configuration(false);
        configuration.set("apex.operator.operator3.prop.myStringProperty", "o3StringFromConf");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(configuration);
        LogicalPlan createFromJson = logicalPlanConfiguration.createFromJson(jSONObject, "testLoadFromJson");
        logicalPlanConfiguration.prepareDAG(createFromJson, (StreamingApplication) null, "testApplication");
        createFromJson.validate();
        validateTopLevelOperators(createFromJson);
        validateTopLevelStreams(createFromJson);
        validatePublicMethods(createFromJson);
    }
}
