package com.datatorrent.stram.plan.logical;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.StringCodec;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.BasicContainerOptConfigurator;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.SchemaTestOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.LogicalPlanTest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.validation.ValidationException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.class */
public class LogicalPlanConfigurationTest {
    private static final Logger logger;
    private static final Logger LOG;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$1DummyOperator, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$1DummyOperator.class */
    public class C1DummyOperator extends BaseOperator {
        int prop;
        public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.1DummyOperator.1
            public void process(Integer num) {
                LogicalPlanConfigurationTest.LOG.debug(num.intValue() + " processed");
                C1DummyOperator.this.output.emit(num);
            }
        };
        public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();

        C1DummyOperator() {
        }
    }

    /* renamed from: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$1DummyOutputOperator, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$1DummyOutputOperator.class */
    class C1DummyOutputOperator extends BaseOperator {
        int prop;
        public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.1DummyOutputOperator.1
            public void process(Integer num) {
                LogicalPlanConfigurationTest.LOG.debug(num.intValue() + " processed");
            }
        };

        C1DummyOutputOperator() {
        }
    }

    /* renamed from: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$1TestUnifierAttributeModule, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$1TestUnifierAttributeModule.class */
    class C1TestUnifierAttributeModule implements Module {
        public transient Module.ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<>();
        public transient Module.ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();

        C1TestUnifierAttributeModule() {
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            C1DummyOperator addOperator = dag.addOperator("DummyOperator", new C1DummyOperator());
            dag.setOperatorAttribute(addOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
            dag.setUnifierAttribute(addOperator.output, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 2);
            this.moduleInput.set(addOperator.input);
            this.moduleOutput.set(addOperator.output);
        }
    }

    @ApplicationAnnotation(name = "AnnotatedAlias")
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$AnnotatedApplication.class */
    class AnnotatedApplication implements StreamingApplication {
        AnnotatedApplication() {
        }

        public void populateDAG(DAG dag, Configuration configuration) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$MockContext1.class */
    public interface MockContext1 extends Context {
        public static final Attribute<Integer> TEST_ATTR = new Attribute<>(1024);
        public static final long serialVersionUID = Attribute.AttributeMap.AttributeInitializer.initialize(MockContext1.class);
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$MockContext2.class */
    public interface MockContext2 extends Context {
        public static final Attribute<Boolean> TEST_ATTR = new Attribute<>(false);
        public static final long serialVersionUID = Attribute.AttributeMap.AttributeInitializer.initialize(MockContext2.class);
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$SimpleTestApplication.class */
    public static class SimpleTestApplication implements StreamingApplication {
        public final GenericTestOperator gt1 = new GenericTestOperator();
        public final GenericTestOperator gt2 = new GenericTestOperator();
        public final GenericTestOperator gt3 = new GenericTestOperator();

        public void populateDAG(DAG dag, Configuration configuration) {
            dag.addOperator("operator1", this.gt1);
            dag.addOperator("operator2", this.gt2);
            dag.addOperator("operator3", this.gt3);
            dag.addStream("s1", this.gt1.outport1, this.gt2.inport1);
            dag.addStream("s2", this.gt2.outport1, this.gt3.inport1, this.gt3.inport2);
        }
    }

    @ApplicationAnnotation(name = "SimpleTestApp")
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$SimpleTestApplicationWithName.class */
    public static class SimpleTestApplicationWithName extends SimpleTestApplication {
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$TestApplication.class */
    public static class TestApplication implements StreamingApplication {
        Integer testprop1;
        Integer testprop2;
        Integer testprop3;
        TestInnerClass inncls = new TestInnerClass();

        /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$TestApplication$TestInnerClass.class */
        public class TestInnerClass {
            Integer a;

            public TestInnerClass() {
            }

            public Integer getA() {
                return this.a;
            }

            public void setA(Integer num) {
                this.a = num;
            }
        }

        public Integer getTestprop1() {
            return this.testprop1;
        }

        public void setTestprop1(Integer num) {
            this.testprop1 = num;
        }

        public Integer getTestprop2() {
            return this.testprop2;
        }

        public void setTestprop2(Integer num) {
            this.testprop2 = num;
        }

        public Integer getTestprop3() {
            return this.testprop3;
        }

        public void setTestprop3(Integer num) {
            this.testprop3 = num;
        }

        public TestInnerClass getInncls() {
            return this.inncls;
        }

        public void setInncls(TestInnerClass testInnerClass) {
            this.inncls = testInnerClass;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$TestSchema.class */
    public static class TestSchema {
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$TestStatsListener.class */
    public static class TestStatsListener implements StatsListener {
        private int intProp;

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            return null;
        }

        public int getIntProp() {
            return this.intProp;
        }

        public void setIntProp(int i) {
            this.intProp = i;
        }

        public int hashCode() {
            return (31 * 1) + this.intProp;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.intProp == ((TestStatsListener) obj).intProp;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest$TestStreamCodec.class */
    public static class TestStreamCodec<T> extends JsonStreamCodec<T> implements Serializable {
        private static final long serialVersionUID = 1;
    }

    private static LogicalPlan.OperatorMeta assertNode(LogicalPlan logicalPlan, String str) {
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta(str);
        Assert.assertNotNull("operator exists id=" + str, operatorMeta);
        return operatorMeta;
    }

    @Test
    public void testLoadFromConfigXml() {
        Configuration configuration = new Configuration(false);
        configuration.addResource("dt-site.xml");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(configuration);
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.populateDAG(logicalPlan);
        logicalPlan.validate();
        Assert.assertEquals("number of operator confs", 6L, logicalPlan.getAllOperators().size());
        LogicalPlan.OperatorMeta assertNode = assertNode(logicalPlan, "operator1");
        LogicalPlan.OperatorMeta assertNode2 = assertNode(logicalPlan, "operator2");
        LogicalPlan.OperatorMeta assertNode3 = assertNode(logicalPlan, "operator3");
        LogicalPlan.OperatorMeta assertNode4 = assertNode(logicalPlan, "operator4");
        Assert.assertNotNull("operatorConf for root", assertNode);
        Assert.assertEquals("operatorId set", "operator1", assertNode.getName());
        Assert.assertEquals(assertNode.getOperator().getClass(), TestGeneratorInputOperator.class);
        Assert.assertEquals("myStringPropertyValue", assertNode.getOperator().getMyStringProperty());
        Assert.assertEquals("operator1 inputs", 0L, assertNode.getInputStreams().size());
        Assert.assertEquals("operator1 outputs", 1L, assertNode.getOutputStreams().size());
        LogicalPlan.StreamMeta streamMeta = (LogicalPlan.StreamMeta) assertNode2.getInputStreams().get(assertNode2.getMeta(assertNode2.getOperator().inport1));
        Assert.assertNotNull("n1n2", streamMeta);
        Assert.assertEquals("rootNode out is operator2 in", streamMeta, assertNode.getOutputStreams().get(assertNode.getMeta(assertNode.getOperator().outport)));
        Assert.assertEquals("n1n2 source", assertNode, streamMeta.getSource().getOperatorMeta());
        Assert.assertEquals("n1n2 targets", 1L, streamMeta.getSinks().size());
        Assert.assertEquals("n1n2 target", assertNode2, ((LogicalPlan.InputPortMeta) streamMeta.getSinks().iterator().next()).getOperatorMeta());
        Assert.assertEquals("stream name", "n1n2", streamMeta.getName());
        Assert.assertEquals("n1n2 not inline (default)", (Object) null, streamMeta.getLocality());
        Assert.assertEquals("operator 2 number of outputs", 1L, assertNode2.getOutputStreams().size());
        LogicalPlan.StreamMeta streamMeta2 = (LogicalPlan.StreamMeta) assertNode2.getOutputStreams().values().iterator().next();
        HashSet newHashSet = Sets.newHashSet();
        Iterator it = streamMeta2.getSinks().iterator();
        while (it.hasNext()) {
            newHashSet.add(((LogicalPlan.InputPortMeta) it.next()).getOperatorMeta());
        }
        Assert.assertEquals("outputs " + streamMeta2, Sets.newHashSet(new LogicalPlan.OperatorMeta[]{assertNode3, assertNode4}), newHashSet);
        LogicalPlan.OperatorMeta assertNode5 = assertNode(logicalPlan, "operator6");
        List rootOperators = logicalPlan.getRootOperators();
        Assert.assertEquals("number root operators", 2L, rootOperators.size());
        Assert.assertTrue("root operator2", rootOperators.contains(assertNode));
        Assert.assertTrue("root operator6", rootOperators.contains(assertNode5));
        Iterator it2 = rootOperators.iterator();
        while (it2.hasNext()) {
            printTopology((LogicalPlan.OperatorMeta) it2.next(), logicalPlan, 0);
        }
    }

    private void printTopology(LogicalPlan.OperatorMeta operatorMeta, DAG dag, int i) {
        logger.debug((i > 0 ? StringUtils.repeat(" ", 20 * (i - 1)) + "   |" + StringUtils.repeat("-", 17) : "") + operatorMeta.getName());
        for (LogicalPlan.StreamMeta streamMeta : operatorMeta.getOutputStreams().values()) {
            if (!streamMeta.getSinks().isEmpty()) {
                Iterator it = streamMeta.getSinks().iterator();
                while (it.hasNext()) {
                    printTopology(((LogicalPlan.InputPortMeta) it.next()).getOperatorMeta(), dag, i + 1);
                }
            }
        }
    }

    @Test
    public void testLoadFromPropertiesFile() throws IOException {
        Properties properties = new Properties();
        InputStream resourceAsStream = getClass().getResourceAsStream("/testTopology.properties");
        if (resourceAsStream == null) {
            Assert.fail("Could not load /testTopology.properties");
        }
        properties.load(resourceAsStream);
        LogicalPlanConfiguration addFromProperties = new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        addFromProperties.populateDAG(logicalPlan);
        logicalPlan.validate();
        Assert.assertEquals("number of operator confs", 5L, logicalPlan.getAllOperators().size());
        Assert.assertEquals("number of root operators", 1L, logicalPlan.getRootOperators().size());
        LogicalPlan.StreamMeta stream = logicalPlan.getStream("n1n2");
        Assert.assertNotNull(stream);
        Assert.assertTrue("n1n2 locality", DAG.Locality.CONTAINER_LOCAL == stream.getLocality());
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta("operator3");
        Assert.assertEquals("operator3.classname", GenericTestOperator.class, operatorMeta.getOperator().getClass());
        GenericTestOperator operator = operatorMeta.getOperator();
        Assert.assertEquals("myStringProperty " + operator, "myStringPropertyValueFromTemplate", operator.getMyStringProperty());
        Assert.assertFalse("booleanProperty " + operator, operator.booleanProperty);
        LogicalPlan.OperatorMeta operatorMeta2 = logicalPlan.getOperatorMeta("operator4");
        GenericTestOperator operator2 = operatorMeta2.getOperator();
        Assert.assertEquals("myStringProperty " + operator2, "overrideOperator4", operator2.getMyStringProperty());
        Assert.assertEquals("setterOnlyOperator4 " + operator2, "setterOnlyOperator4", operator2.propertySetterOnly);
        Assert.assertTrue("booleanProperty " + operator2, operator2.booleanProperty);
        LogicalPlan.StreamMeta stream2 = logicalPlan.getStream("inputStream");
        Assert.assertNotNull(stream2);
        Assert.assertEquals("input1 source", logicalPlan.getOperatorMeta("inputOperator"), stream2.getSource().getOperatorMeta());
        HashSet newHashSet = Sets.newHashSet();
        Iterator it = stream2.getSinks().iterator();
        while (it.hasNext()) {
            newHashSet.add(((LogicalPlan.InputPortMeta) it.next()).getOperatorMeta());
        }
        Assert.assertEquals("input1 target ", Sets.newHashSet(new LogicalPlan.OperatorMeta[]{logicalPlan.getOperatorMeta("operator1"), operatorMeta, operatorMeta2}), newHashSet);
    }

    @Test
    public void testLoadFromPropertiesFileWithLegacyPrefix() throws IOException {
        Properties properties = new Properties();
        InputStream resourceAsStream = getClass().getResourceAsStream("/testTopologyLegacyPrefix.properties");
        if (resourceAsStream == null) {
            Assert.fail("Could not load /testTopologyLegacyPrefix.properties");
        }
        properties.load(resourceAsStream);
        LogicalPlanConfiguration addFromProperties = new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        addFromProperties.populateDAG(logicalPlan);
        logicalPlan.validate();
        Assert.assertEquals("number of operators", 2L, logicalPlan.getAllOperators().size());
        Assert.assertEquals("number of root operators", 1L, logicalPlan.getRootOperators().size());
        LogicalPlan.StreamMeta stream = logicalPlan.getStream("s1");
        Assert.assertNotNull(stream);
        Assert.assertTrue("s1 locality", DAG.Locality.CONTAINER_LOCAL == stream.getLocality());
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta("o2");
        Assert.assertEquals(GenericTestOperator.class, operatorMeta.getOperator().getClass());
        GenericTestOperator operator = operatorMeta.getOperator();
        Assert.assertEquals("myStringProperty " + operator, "myStringPropertyValue", operator.getMyStringProperty());
    }

    @Test
    public void testDeprecation() {
        String str = "dt." + Context.DAGContext.APPLICATION_NAME.getName();
        String str2 = LogicalPlanConfiguration.KEY_APPLICATION_NAME;
        Configuration configuration = new Configuration(false);
        configuration.set(str, "bar");
        Assert.assertEquals("bar", configuration.get(str2));
    }

    @Test
    public void testLoadFromJson() throws Exception {
        InputStream resourceAsStream = getClass().getResourceAsStream("/testTopology.json");
        if (resourceAsStream == null) {
            Assert.fail("Could not load /testTopology.json");
        }
        StringWriter stringWriter = new StringWriter();
        IOUtils.copy(resourceAsStream, stringWriter);
        JSONObject jSONObject = new JSONObject(stringWriter.toString());
        Configuration configuration = new Configuration(false);
        configuration.set("apex.operator.operator3.prop.myStringProperty", "o3StringFromConf");
        LogicalPlan createFromJson = new LogicalPlanConfiguration(configuration).createFromJson(jSONObject, "testLoadFromJson");
        createFromJson.validate();
        Assert.assertEquals("DAG attribute CONTAINER_JVM_OPTIONS ", createFromJson.getAttributes().get(Context.DAGContext.CONTAINER_JVM_OPTIONS), "-Xmx16m");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(Integer.class, StringCodec.Integer2String.class);
        Assert.assertEquals("DAG attribute STRING_CODECS ", newHashMap, createFromJson.getAttributes().get(Context.DAGContext.STRING_CODECS));
        Assert.assertEquals("DAG attribute CONTAINER_OPTS_CONFIGURATOR ", BasicContainerOptConfigurator.class, ((Context.ContainerOptConfigurator) createFromJson.getAttributes().get(Context.DAGContext.CONTAINER_OPTS_CONFIGURATOR)).getClass());
        Assert.assertEquals("number of operator confs", 5L, createFromJson.getAllOperators().size());
        Assert.assertEquals("number of root operators", 1L, createFromJson.getRootOperators().size());
        LogicalPlan.StreamMeta stream = createFromJson.getStream("n1n2");
        Assert.assertNotNull(stream);
        Assert.assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == stream.getLocality());
        LogicalPlan.OperatorMeta operatorMeta = createFromJson.getOperatorMeta("inputOperator");
        TestStatsListener testStatsListener = new TestStatsListener();
        testStatsListener.setIntProp(222);
        Assert.assertEquals("inputOperator STATS_LISTENERS attribute ", Lists.newArrayList(new StatsListener[]{testStatsListener}), operatorMeta.getAttributes().get(Context.OperatorContext.STATS_LISTENERS));
        Iterator it = operatorMeta.getOutputStreams().keySet().iterator();
        while (it.hasNext()) {
            Assert.assertTrue("output port of input Operator attribute is JsonStreamCodec ", ((LogicalPlan.OutputPortMeta) it.next()).getAttributes().get(Context.PortContext.STREAM_CODEC) instanceof JsonStreamCodec);
        }
        LogicalPlan.OperatorMeta operatorMeta2 = createFromJson.getOperatorMeta("operator3");
        Assert.assertEquals("operator3.classname", GenericTestOperator.class, operatorMeta2.getOperator().getClass());
        GenericTestOperator operator = operatorMeta2.getOperator();
        Assert.assertEquals("myStringProperty " + operator, "o3StringFromConf", operator.getMyStringProperty());
        Assert.assertFalse("booleanProperty " + operator, operator.booleanProperty);
        LogicalPlan.OperatorMeta operatorMeta3 = createFromJson.getOperatorMeta("operator4");
        GenericTestOperator operator2 = operatorMeta3.getOperator();
        Assert.assertEquals("myStringProperty " + operator2, "overrideOperator4", operator2.getMyStringProperty());
        Assert.assertEquals("setterOnlyOperator4 " + operator2, "setterOnlyOperator4", operator2.propertySetterOnly);
        Assert.assertTrue("booleanProperty " + operator2, operator2.booleanProperty);
        LogicalPlan.StreamMeta stream2 = createFromJson.getStream("inputStream");
        Assert.assertNotNull(stream2);
        LogicalPlan.OperatorMeta operatorMeta4 = createFromJson.getOperatorMeta("inputOperator");
        Assert.assertEquals("input1 source", operatorMeta4, stream2.getSource().getOperatorMeta());
        HashSet newHashSet = Sets.newHashSet();
        Iterator it2 = stream2.getSinks().iterator();
        while (it2.hasNext()) {
            newHashSet.add(((LogicalPlan.InputPortMeta) it2.next()).getOperatorMeta());
        }
        Assert.assertEquals("operator attribute " + operatorMeta4, 64L, ((Integer) operatorMeta4.getValue(Context.OperatorContext.MEMORY_MB)).intValue());
        Assert.assertEquals("port attribute " + operatorMeta4, 8L, ((Integer) stream2.getSource().getValue(Context.PortContext.UNIFIER_LIMIT)).intValue());
        Assert.assertEquals("input1 target ", Sets.newHashSet(new LogicalPlan.OperatorMeta[]{createFromJson.getOperatorMeta("operator1"), operatorMeta2, operatorMeta3}), newHashSet);
    }

    @Test
    public void testAppLevelAttributes() {
        Properties properties = new Properties();
        properties.put("apex." + DAG.MASTER_MEMORY_MB.getName(), "123");
        properties.put("apex." + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
        properties.put("apex." + DAG.APPLICATION_PATH.getName(), "/defaultdir");
        properties.put("apex.application.app1." + DAG.APPLICATION_PATH.getName(), "/otherdir");
        properties.put("apex.application.app1." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.populateDAG(logicalPlan);
        logicalPlanConfiguration.setApplicationConfiguration(logicalPlan, "app1", (StreamingApplication) null);
        Assert.assertEquals("", "/otherdir", logicalPlan.getValue(DAG.APPLICATION_PATH));
        Assert.assertEquals("", 123, logicalPlan.getValue(DAG.MASTER_MEMORY_MB));
        Assert.assertEquals("", 1000, logicalPlan.getValue(DAG.STREAMING_WINDOW_SIZE_MILLIS));
        Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", logicalPlan.getValue(DAG.CONTAINER_JVM_OPTIONS));
    }

    @Test
    public void testAppLevelProperties() {
        Properties properties = new Properties();
        properties.put("apex.application.app1.testprop1", "10");
        properties.put("apex.application.app1.prop.testprop2", "100");
        properties.put("apex.application.*.prop.testprop3", "1000");
        properties.put("apex.application.app1.inncls.a", "10000");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        TestApplication testApplication = new TestApplication();
        logicalPlanConfiguration.setApplicationConfiguration(logicalPlan, "app1", testApplication);
        Assert.assertEquals("", 10, testApplication.getTestprop1());
        Assert.assertEquals("", 100, testApplication.getTestprop2());
        Assert.assertEquals("", 1000, testApplication.getTestprop3());
        Assert.assertEquals("", 10000, testApplication.getInncls().getA());
    }

    @Test
    public void testPrepareDAG() {
        final MutableBoolean mutableBoolean = new MutableBoolean(false);
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.1
            public void populateDAG(DAG dag, Configuration configuration) {
                Assert.assertEquals("", "hostname:9090", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS));
                dag.setAttribute(DAG.GATEWAY_CONNECT_ADDRESS, "hostname:9091");
                mutableBoolean.setValue(true);
            }
        };
        Configuration configuration = new Configuration(false);
        configuration.addResource("dt-site.xml");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(configuration);
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, streamingApplication, "testconfig");
        Assert.assertTrue("populateDAG called", mutableBoolean.booleanValue());
        Assert.assertEquals("populateDAG overrides attribute", "hostname:9091", logicalPlan.getValue(DAG.GATEWAY_CONNECT_ADDRESS));
    }

    @Test
    public void testOperatorConfigurationLookup() {
        Properties properties = new Properties();
        properties.put("apex.template.matchId1.matchIdRegExp", ".*operator1.*");
        properties.put("apex.template.matchId1.stringProperty2", "stringProperty2Value-matchId1");
        properties.put("apex.template.matchId1.nested.property", "nested.propertyValue-matchId1");
        properties.put("apex.template.matchClass1.matchClassNameRegExp", ".*" + LogicalPlanTest.ValidationTestOperator.class.getSimpleName());
        properties.put("apex.template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1");
        properties.put("apex.template.t2.matchClassNameRegExp", ".*" + GenericTestOperator.class.getSimpleName());
        properties.put("apex.template.t2.myStringProperty", "myStringPropertyValue");
        properties.put("apex.operator.operator3.emitFormat", "emitFormatValue");
        LogicalPlan logicalPlan = new LogicalPlan();
        Operator addOperator = logicalPlan.addOperator("operator1", new LogicalPlanTest.ValidationTestOperator());
        Operator addOperator2 = logicalPlan.addOperator("operator2", new LogicalPlanTest.ValidationTestOperator());
        Operator addOperator3 = logicalPlan.addOperator("operator3", new GenericTestOperator());
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        LOG.debug("calling addFromProperties");
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        Map properties2 = logicalPlanConfiguration.getProperties(logicalPlan.getMeta(addOperator), "appName");
        Assert.assertEquals("" + properties2, 2L, properties2.size());
        Assert.assertEquals("" + properties2, "stringProperty2Value-matchId1", properties2.get("stringProperty2"));
        Assert.assertEquals("" + properties2, "nested.propertyValue-matchId1", properties2.get("nested.property"));
        Map properties3 = logicalPlanConfiguration.getProperties(logicalPlan.getMeta(addOperator2), "appName");
        Assert.assertEquals("" + properties3, 1L, properties3.size());
        Assert.assertEquals("" + properties3, "stringProperty2Value-matchClass1", properties3.get("stringProperty2"));
        Map properties4 = logicalPlanConfiguration.getProperties(logicalPlan.getMeta(addOperator3), "appName");
        Assert.assertEquals("" + properties4, 2L, properties4.size());
        Assert.assertEquals("" + properties4, "myStringPropertyValue", properties4.get("myStringProperty"));
        Assert.assertEquals("" + properties4, "emitFormatValue", properties4.get("emitFormat"));
    }

    @Test
    public void testSetOperatorProperties() {
        Configuration configuration = new Configuration(false);
        configuration.set("apex.operator.o1.prop.myStringProperty", "myStringPropertyValue");
        configuration.set("apex.operator.o2.prop.stringArrayField", "a,b,c");
        configuration.set("apex.operator.o2.prop.mapProperty.key1", "key1Val");
        configuration.set("apex.operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
        configuration.set("apex.operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", new GenericTestOperator());
        LogicalPlanTest.ValidationTestOperator addOperator2 = logicalPlan.addOperator("o2", new LogicalPlanTest.ValidationTestOperator());
        new LogicalPlanConfiguration(configuration).setOperatorProperties(logicalPlan, "testSetOperatorProperties");
        Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", addOperator.getMyStringProperty());
        Assert.assertArrayEquals("o2.stringArrayField", new String[]{"a", "b", "c"}, addOperator2.getStringArrayField());
        Assert.assertEquals("o2.mapProperty.key1", "key1Val", addOperator2.getMapProperty().get("key1"));
        Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", addOperator2.getMapProperty().get("key1.dot"));
        Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", addOperator2.getMapProperty().get("key2.dot"));
    }

    @Test
    public void testAppNameAttribute() {
        AnnotatedApplication annotatedApplication = new AnnotatedApplication();
        Configuration configuration = new Configuration(false);
        configuration.addResource("dt-site.xml");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(configuration);
        Properties properties = new Properties();
        properties.put("apex.application.TestAliasApp.class", annotatedApplication.getClass().getName());
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        String str = annotatedApplication.getClass().getName().replace(".", "/") + ".class";
        logicalPlan.setAttribute(Context.DAGContext.APPLICATION_NAME, "testApp");
        logicalPlanConfiguration.prepareDAG(logicalPlan, annotatedApplication, str);
        Assert.assertEquals("Application name", "testApp", logicalPlan.getAttributes().get(Context.DAGContext.APPLICATION_NAME));
    }

    @Test
    public void testAppAlias() {
        AnnotatedApplication annotatedApplication = new AnnotatedApplication();
        Configuration configuration = new Configuration(false);
        configuration.addResource("dt-site.xml");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(configuration);
        Properties properties = new Properties();
        properties.put("apex.application.TestAliasApp.class", annotatedApplication.getClass().getName());
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, annotatedApplication, annotatedApplication.getClass().getName().replace(".", "/") + ".class");
        Assert.assertEquals("Application name", "TestAliasApp", logicalPlan.getAttributes().get(Context.DAGContext.APPLICATION_NAME));
    }

    @Test
    public void testAppAnnotationAlias() {
        AnnotatedApplication annotatedApplication = new AnnotatedApplication();
        Configuration configuration = new Configuration(false);
        configuration.addResource("dt-site.xml");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(configuration);
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, annotatedApplication, annotatedApplication.getClass().getName().replace(".", "/") + ".class");
        Assert.assertEquals("Application name", "AnnotatedAlias", logicalPlan.getAttributes().get(Context.DAGContext.APPLICATION_NAME));
    }

    @Test
    public void testOperatorLevelAttributes() {
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.2
            public void populateDAG(DAG dag, Configuration configuration) {
                dag.addOperator("operator1", GenericTestOperator.class);
                dag.addOperator("operator2", GenericTestOperator.class);
            }
        };
        Properties properties = new Properties();
        properties.put("apex.application.app1.class", streamingApplication.getClass().getName());
        properties.put("apex.operator.*." + Context.OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
        properties.put("apex.operator.*." + Context.OperatorContext.STATS_LISTENERS.getName(), PartitioningTest.PartitionLoadWatch.class.getName());
        properties.put("apex.application.app1.operator.operator1." + Context.OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = streamingApplication.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, streamingApplication, str);
        Assert.assertEquals("", 20, logicalPlan.getOperatorMeta("operator1").getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
        Assert.assertEquals("", 2, logicalPlan.getOperatorMeta("operator2").getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
        Assert.assertEquals("", PartitioningTest.PartitionLoadWatch.class, ((Collection) logicalPlan.getOperatorMeta("operator2").getValue(Context.OperatorContext.STATS_LISTENERS)).toArray()[0].getClass());
    }

    @Test
    public void testUnifierLevelAttributes() {
        final GenericTestOperator genericTestOperator = new GenericTestOperator();
        final GenericTestOperator genericTestOperator2 = new GenericTestOperator();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.3
            public void populateDAG(DAG dag, Configuration configuration) {
                dag.addOperator("operator1", genericTestOperator);
                dag.addOperator("operator2", genericTestOperator2);
                dag.addStream("s1", genericTestOperator.outport1, genericTestOperator2.inport1);
            }
        };
        Properties properties = new Properties();
        properties.put("apex.application.app1.class", streamingApplication.getClass().getName());
        properties.put("apex.application.app1.operator.operator1.outputport.outport1.unifier." + Context.OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
        properties.put("apex.application.app1.operator.operator1.outputport.outport1.unifier." + Context.OperatorContext.MEMORY_MB.getName(), "512");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = streamingApplication.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, streamingApplication, str);
        LogicalPlan.OperatorMeta operatorMeta = null;
        for (Map.Entry entry : logicalPlan.getOperatorMeta("operator1").getOutputStreams().entrySet()) {
            if (((LogicalPlan.OutputPortMeta) entry.getKey()).getPortName().equals(GenericTestOperator.OPORT1)) {
                operatorMeta = ((LogicalPlan.OutputPortMeta) entry.getKey()).getUnifierMeta();
            }
        }
        Assert.assertNotNull(operatorMeta);
        Assert.assertEquals("", 2, operatorMeta.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
        Assert.assertEquals("", 512, operatorMeta.getValue(Context.OperatorContext.MEMORY_MB));
    }

    @Test
    public void testModuleUnifierLevelAttributes() {
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.4
            public void populateDAG(DAG dag, Configuration configuration) {
                dag.addStream("Module To Operator", ((C1TestUnifierAttributeModule) dag.addModule("TestModule", new C1TestUnifierAttributeModule())).moduleOutput, dag.addOperator("DummyOutputOperator", new C1DummyOutputOperator()).input);
            }
        };
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new MockStorageAgent());
        logicalPlanConfiguration.prepareDAG(logicalPlan, streamingApplication, "UnifierApp");
        LogicalPlan.OperatorMeta operatorMeta = null;
        for (Map.Entry entry : logicalPlan.getOperatorMeta("TestModule$DummyOperator").getOutputStreams().entrySet()) {
            if (((LogicalPlan.OutputPortMeta) entry.getKey()).getPortName().equals("output")) {
                operatorMeta = ((LogicalPlan.OutputPortMeta) entry.getKey()).getUnifierMeta();
            }
        }
        Assert.assertNotNull(operatorMeta);
        Assert.assertEquals("", 2, operatorMeta.getValue(Context.OperatorContext.TIMEOUT_WINDOW_COUNT));
        LogicalPlan.OperatorMeta operatorMeta2 = null;
        Iterator it = new PhysicalPlan(logicalPlan, new TestPlanContext()).getContainers().iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : ((PTContainer) it.next()).getOperators()) {
                if (pTOperator.isUnifier()) {
                    operatorMeta2 = pTOperator.getOperatorMeta();
                }
            }
        }
        Assert.assertEquals("", 2, operatorMeta2.getValue(Context.OperatorContext.TIMEOUT_WINDOW_COUNT));
    }

    @Test
    public void testOperatorLevelProperties() {
        final GenericTestOperator genericTestOperator = new GenericTestOperator();
        final GenericTestOperator genericTestOperator2 = new GenericTestOperator();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.5
            public void populateDAG(DAG dag, Configuration configuration) {
                dag.addOperator("operator1", genericTestOperator);
                dag.addOperator("operator2", genericTestOperator2);
            }
        };
        Properties properties = new Properties();
        properties.put("apex.application.app1.class", streamingApplication.getClass().getName());
        properties.put("apex.operator.*.myStringProperty", "pv1");
        properties.put("apex.operator.*.booleanProperty", Boolean.TRUE.toString());
        properties.put("apex.application.app1.operator.operator1.myStringProperty", "apv1");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        logicalPlanConfiguration.prepareDAG(new LogicalPlan(), streamingApplication, streamingApplication.getClass().getName().replace(".", "/") + ".class");
        Assert.assertEquals("apv1", genericTestOperator.getMyStringProperty());
        Assert.assertEquals("pv1", genericTestOperator2.getMyStringProperty());
        Assert.assertEquals(true, Boolean.valueOf(genericTestOperator2.isBooleanProperty()));
    }

    @Test
    public void testApplicationLevelParameter() {
        final GenericTestOperator genericTestOperator = new GenericTestOperator();
        final GenericTestOperator genericTestOperator2 = new GenericTestOperator();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.6
            public void populateDAG(DAG dag, Configuration configuration) {
                dag.addOperator("operator1", genericTestOperator);
                dag.addOperator("operator2", genericTestOperator2);
            }
        };
        Properties properties = new Properties();
        properties.put("apex.application.app1.class", streamingApplication.getClass().getName());
        properties.put("apex.operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz");
        properties.put("apex.operator.*.booleanProperty", Boolean.TRUE.toString());
        properties.put("apex.application.app1.operator.operator1.myStringProperty", "apv1");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        Configuration configuration = new Configuration(false);
        configuration.set("xyz", "123");
        configuration.set("zzz", "456");
        logicalPlanConfiguration.addFromProperties(properties, configuration);
        logicalPlanConfiguration.prepareDAG(new LogicalPlan(), streamingApplication, streamingApplication.getClass().getName().replace(".", "/") + ".class");
        Assert.assertEquals("apv1", genericTestOperator.getMyStringProperty());
        Assert.assertEquals("foo 123 bar 456 baz", genericTestOperator2.getMyStringProperty());
        Assert.assertEquals(true, Boolean.valueOf(genericTestOperator2.isBooleanProperty()));
    }

    @Test
    public void testPortLevelAttributes() {
        SimpleTestApplication simpleTestApplication = new SimpleTestApplication();
        Properties properties = new Properties();
        properties.put("apex.application.app1.class", simpleTestApplication.getClass().getName());
        properties.put("apex.application.app1.operator.operator1.port.*." + Context.PortContext.QUEUE_CAPACITY.getName(), "16384");
        properties.put("apex.application.app1.operator.operator2.inputport.inport1." + Context.PortContext.QUEUE_CAPACITY.getName(), "32768");
        properties.put("apex.application.app1.operator.operator2.outputport.outport1." + Context.PortContext.QUEUE_CAPACITY.getName(), "32768");
        properties.put("apex.application.app1.operator.operator3.port.*." + Context.PortContext.QUEUE_CAPACITY.getName(), "16384");
        properties.put("apex.application.app1.operator.operator3.inputport.inport2." + Context.PortContext.QUEUE_CAPACITY.getName(), "32768");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = simpleTestApplication.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, simpleTestApplication, str);
        Assert.assertEquals("", 16384, logicalPlan.getOperatorMeta("operator1").getMeta(simpleTestApplication.gt1.outport1).getValue(Context.PortContext.QUEUE_CAPACITY));
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta("operator2");
        Assert.assertEquals("", 32768, operatorMeta.getMeta(simpleTestApplication.gt2.inport1).getValue(Context.PortContext.QUEUE_CAPACITY));
        Assert.assertEquals("", 32768, operatorMeta.getMeta(simpleTestApplication.gt2.outport1).getValue(Context.PortContext.QUEUE_CAPACITY));
        LogicalPlan.OperatorMeta operatorMeta2 = logicalPlan.getOperatorMeta("operator3");
        Assert.assertEquals("", 16384, operatorMeta2.getMeta(simpleTestApplication.gt3.inport1).getValue(Context.PortContext.QUEUE_CAPACITY));
        Assert.assertEquals("", 32768, operatorMeta2.getMeta(simpleTestApplication.gt3.inport2).getValue(Context.PortContext.QUEUE_CAPACITY));
    }

    @Test
    public void testInvalidAttribute() throws Exception {
        Assert.assertNotSame(0, Long.valueOf(Context.DAGContext.serialVersionUID));
        Attribute attribute = new Attribute("", (StringCodec) null);
        Field declaredField = Attribute.class.getDeclaredField("name");
        declaredField.setAccessible(true);
        declaredField.set(attribute, "NOT_CONFIGURABLE");
        declaredField.setAccessible(false);
        LogicalPlanConfiguration.ContextUtils.addAttribute(Context.DAGContext.class, attribute);
        LogicalPlanConfiguration.AttributeParseUtils.initialize();
        LogicalPlanConfiguration.ConfElement.initialize();
        Properties properties = new Properties();
        properties.put("apex.attr.NOT_CONFIGURABLE", "value");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        try {
            logicalPlanConfiguration.prepareDAG(new LogicalPlan(), (StreamingApplication) null, "");
            Assert.fail("Exception expected");
        } catch (Exception e) {
            Assert.assertThat("Attribute not configurable", e.getMessage(), StramTestSupport.RegexMatcher.matches("Attribute does not support property configuration: NOT_CONFIGURABLE.*"));
        }
        LogicalPlanConfiguration.ContextUtils.removeAttribute(Context.DAGContext.class, attribute);
        LogicalPlanConfiguration.AttributeParseUtils.initialize();
        LogicalPlanConfiguration.ConfElement.initialize();
        Properties properties2 = new Properties();
        properties2.put("apex.attr.INVALID_NAME", "value");
        try {
            new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(properties2, (Configuration) null);
            Assert.fail("Exception expected");
        } catch (Exception e2) {
            LOG.debug("Exception message: {}", e2);
            Assert.assertThat("Invalid attribute name", e2.getMessage(), StramTestSupport.RegexMatcher.matches("Invalid attribute reference: apex.attr.INVALID_NAME"));
        }
    }

    @Test
    public void testAttributesCodec() {
        Assert.assertNotSame((Object) null, new Long[]{Long.valueOf(Context.DAGContext.serialVersionUID), Long.valueOf(Context.OperatorContext.serialVersionUID), Long.valueOf(Context.PortContext.serialVersionUID)});
        Iterator it = Sets.newHashSet(new Class[]{Context.DAGContext.class, Context.OperatorContext.class, Context.PortContext.class}).iterator();
        while (it.hasNext()) {
            for (Attribute attribute : Attribute.AttributeMap.AttributeInitializer.getAttributes((Class) it.next())) {
                Assert.assertNotNull(attribute.name + " codec", attribute.codec);
            }
        }
    }

    @Test
    public void testTupleClassAttr() throws Exception {
        InputStream resourceAsStream = getClass().getResourceAsStream("/schemaTestTopology.json");
        if (resourceAsStream == null) {
            Assert.fail("Could not load /schemaTestTopology.json");
        }
        StringWriter stringWriter = new StringWriter();
        IOUtils.copy(resourceAsStream, stringWriter);
        LogicalPlan createFromJson = new LogicalPlanConfiguration(new Configuration(false)).createFromJson(new JSONObject(stringWriter.toString()), "testLoadFromJson");
        createFromJson.validate();
        Assert.assertEquals("operator1.classname", SchemaTestOperator.class, createFromJson.getOperatorMeta("operator1").getOperator().getClass());
        LogicalPlan.StreamMeta stream = createFromJson.getStream("inputStream");
        Assert.assertNotNull(stream);
        Iterator it = stream.getSinks().iterator();
        while (it.hasNext()) {
            Assert.assertEquals("tuple class name required", TestSchema.class, ((LogicalPlan.InputPortMeta) it.next()).getAttributes().get(Context.PortContext.TUPLE_CLASS));
        }
    }

    @Test(expected = ValidationException.class)
    public void testTupleClassAttrValidation() throws Exception {
        InputStream resourceAsStream = getClass().getResourceAsStream("/schemaTestTopology.json");
        if (resourceAsStream == null) {
            Assert.fail("Could not load /schemaTestTopology.json");
        }
        StringWriter stringWriter = new StringWriter();
        IOUtils.copy(resourceAsStream, stringWriter);
        JSONObject jSONObject = new JSONObject(stringWriter.toString());
        jSONObject.getJSONArray("streams").getJSONObject(0).remove("schema");
        new LogicalPlanConfiguration(new Configuration(false)).createFromJson(jSONObject, "testLoadFromJson").validate();
    }

    @Test
    public void testTestTupleClassAttrSetFromConfig() {
        Configuration configuration = new Configuration(false);
        configuration.set("apex.operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS", "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestSchema");
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest.7
            public void populateDAG(DAG dag, Configuration configuration2) {
                dag.addStream("stream", dag.addOperator("o1", new TestGeneratorInputOperator()).outport, dag.addOperator("o2", new SchemaTestOperator()).schemaRequiredPort);
            }
        };
        LogicalPlan logicalPlan = new LogicalPlan();
        new LogicalPlanConfiguration(configuration).prepareDAG(logicalPlan, streamingApplication, "app");
        logicalPlan.validate();
    }

    @Test
    public void testDagAttributeNotSetOnOperator() {
        dagOperatorAttributeHelper(true);
    }

    @Test
    public void testAmbiguousAttributeSetOnOperatorAndNotDAG() {
        dagOperatorAttributeHelper(false);
    }

    private void dagOperatorAttributeHelper(boolean z) {
        String simpleName = z ? Context.DAGContext.CHECKPOINT_WINDOW_COUNT.getSimpleName() : Context.OperatorContext.class.getCanonicalName() + "." + Context.DAGContext.CHECKPOINT_WINDOW_COUNT.getSimpleName();
        Properties properties = new Properties();
        properties.put("apex." + LogicalPlanConfiguration.StramElement.ATTR.getValue() + "." + simpleName, "5");
        SimpleTestApplicationWithName simpleTestApplicationWithName = new SimpleTestApplicationWithName();
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = simpleTestApplicationWithName.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, simpleTestApplicationWithName, str);
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta("operator1");
        if (z) {
            Assert.assertNotEquals(5, operatorMeta.getValue(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT));
        } else {
            Assert.assertEquals(5, operatorMeta.getValue(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT));
        }
    }

    @Test
    public void testRootLevelAmbiguousAttributeSimple() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.", null, Boolean.TRUE, true, true);
    }

    @Test
    public void testApplicationLevelAmbiguousAttributeSimple() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.application.*.", null, Boolean.TRUE, true, true);
    }

    @Test
    public void testOperatorLevelAmbiguousAttributeSimple() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.operator.*.", null, Boolean.TRUE, true, false);
    }

    @Test
    public void testPortLevelAmbiguousAttributeSimple() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.port.*.", null, Boolean.TRUE, false, true);
    }

    @Test
    public void testRootLevelAmbiguousAttributeComplex() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.", Context.PortContext.class.getCanonicalName(), Boolean.TRUE, false, true);
    }

    @Test
    public void testApplicationLevelAmbiguousAttributeComplex() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.application.*.", Context.PortContext.class.getCanonicalName(), Boolean.TRUE, false, true);
    }

    @Test
    public void testOperatorLevelAmbiguousAttributeComplex() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.operator.*.", Context.OperatorContext.class.getCanonicalName(), Boolean.TRUE, true, false);
    }

    @Test
    public void testOperatorLevelAmbiguousAttributeComplex2() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.operator.*.", Context.PortContext.class.getCanonicalName(), Boolean.TRUE, false, true);
    }

    @Test
    public void testPortLevelAmbiguousAttributeComplex() {
        testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, "apex.port.*.", Context.PortContext.class.getCanonicalName(), Boolean.TRUE, false, true);
    }

    private void testAttributeAmbiguousSimpleHelper(Attribute<?> attribute, Attribute<?> attribute2, String str, String str2, Object obj, boolean z, boolean z2) {
        Properties propertiesBuilder = propertiesBuilder(attribute.getSimpleName(), str, str2, obj);
        simpleAttributeOperatorHelperAssert(attribute, propertiesBuilder, obj, z);
        simpleNamePortAssertHelperAssert(attribute2, propertiesBuilder, obj, z2);
    }

    @Test
    public void testRootLevelAttributeSimpleNameOperator() {
        simpleAttributeOperatorHelper(Context.OperatorContext.MEMORY_MB, "apex.", true, 4096, true, true);
    }

    @Test
    public void testRootLevelStorageAgentSimpleNameOperator() {
        simpleAttributeOperatorHelper(Context.OperatorContext.STORAGE_AGENT, "apex.", true, new MockStorageAgent(), true, false);
    }

    @Test
    public void testRootLevelAttributeSimpleNameOperatorNoScope() {
        simpleAttributeOperatorHelper(Context.OperatorContext.MEMORY_MB, "apex.", true, 4096, true, false);
    }

    @Test
    public void testApplicationLevelAttributeSimpleNameOperator() {
        simpleAttributeOperatorHelper(Context.OperatorContext.MEMORY_MB, "apex.application.SimpleTestApp.", true, 4096, true, true);
    }

    private void simpleAttributeOperatorHelper(Attribute<?> attribute, String str, boolean z, Object obj, boolean z2, boolean z3) {
        simpleAttributeOperatorHelperAssert(attribute, propertiesBuilderOperator(attribute.getSimpleName(), str, z, obj, z3), obj, z2);
    }

    private void simpleAttributeOperatorHelperAssert(Attribute<?> attribute, Properties properties, Object obj, boolean z) {
        SimpleTestApplicationWithName simpleTestApplicationWithName = new SimpleTestApplicationWithName();
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = simpleTestApplicationWithName.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, simpleTestApplicationWithName, str);
        LogicalPlan.OperatorMeta operatorMeta = logicalPlan.getOperatorMeta("operator1");
        if (z) {
            Assert.assertEquals(obj, operatorMeta.getValue(attribute));
        } else {
            Assert.assertNotEquals(obj, operatorMeta.getValue(attribute));
        }
        LogicalPlan.OperatorMeta operatorMeta2 = logicalPlan.getOperatorMeta("operator2");
        if (z) {
            Assert.assertEquals(obj, operatorMeta2.getValue(attribute));
        } else {
            Assert.assertNotEquals(obj, operatorMeta2.getValue(attribute));
        }
        LogicalPlan.OperatorMeta operatorMeta3 = logicalPlan.getOperatorMeta("operator3");
        if (z) {
            Assert.assertEquals(obj, operatorMeta3.getValue(attribute));
        } else {
            Assert.assertNotEquals(obj, operatorMeta3.getValue(attribute));
        }
    }

    @Test
    public void testRootLevelAttributeSimpleNamePort() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", true, 4096, true, true);
    }

    @Test
    public void testRootLevelAttributeSimpleNamePortNoScope() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", true, 4096, true, false);
    }

    @Test
    public void testOperatorLevelAttributeSimpleNamePort() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.operator.*.", true, 4096, true, true);
    }

    @Test
    public void testApplicationLevelAttributeSimpleNamePort() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.application.SimpleTestApp.", true, 4096, true, true);
    }

    @Test
    public void testRootLevelAttributeComplexNamePort() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", false, 4096, true, true);
    }

    @Test
    public void testRootLevelAttributeComplexNamePortNoScope() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", false, 4096, true, false);
    }

    @Test
    public void testOperatorLevelAttributeComplexNamePort() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.operator.*.", false, 4096, true, true);
    }

    @Test
    public void testApplicationLevelAttributeComplexNamePort() {
        simpleAttributePortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.application.SimpleTestApp.", false, 4096, true, true);
    }

    @Test
    public void testRootLevelAttributeSimpleNameInputPort() {
        simpleAttributeInputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", true, 4096, true);
    }

    @Test
    public void testOperatorLevelAttributeSimpleNameInputPort() {
        simpleAttributeInputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.operator.*.", true, 4096, true);
    }

    @Test
    public void testApplicationLevelAttributeSimpleNameInputPort() {
        simpleAttributeInputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.application.SimpleTestApp.", true, 4096, true);
    }

    @Test
    public void testRootLevelAttributeComplexNameInputPort() {
        simpleAttributeInputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", false, 4096, true);
    }

    @Test
    public void testOperatorLevelAttributeComplexNameInputPort() {
        simpleAttributeInputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.operator.*.", false, 4096, true);
    }

    @Test
    public void testApplicationLevelAttributeComplexNameInputPort() {
        simpleAttributeInputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.application.SimpleTestApp.", false, 4096, true);
    }

    @Test
    public void testRootLevelAttributeSimpleNameOutputPort() {
        simpleAttributeOutputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", true, 4096, true);
    }

    @Test
    public void testOperatorLevelAttributeSimpleNameOutputPort() {
        simpleAttributeOutputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.operator.*.", true, 4096, true);
    }

    @Test
    public void testApplicationLevelAttributeSimpleNameOutputPort() {
        simpleAttributeOutputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.application.SimpleTestApp.", true, 4096, true);
    }

    @Test
    public void testRootLevelAttributeComplexNameOutputPort() {
        simpleAttributeOutputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.", false, 4096, true);
    }

    @Test
    public void testOperatorLevelAttributeComplexNameOutputPort() {
        simpleAttributeOutputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.operator.*.", false, 4096, true);
    }

    @Test
    public void testApplicationLevelAttributeComplexNameOutputPort() {
        simpleAttributeOutputPortHelper(Context.PortContext.QUEUE_CAPACITY, "apex.application.SimpleTestApp.", false, 4096, true);
    }

    private void simpleAttributePortHelper(Attribute<?> attribute, String str, boolean z, Object obj, boolean z2, boolean z3) {
        simpleNamePortAssertHelperAssert(attribute, propertiesBuilderPort(attribute.getSimpleName(), str, z, obj, z3), obj, z2);
    }

    private void simpleAttributeInputPortHelper(Attribute<?> attribute, String str, boolean z, Object obj, boolean z2) {
        Properties propertiesBuilderInputPort = propertiesBuilderInputPort(attribute.getSimpleName(), str, z, obj);
        simpleNameInputPortAssertHelperAssert(attribute, propertiesBuilderInputPort, obj, z2);
        simpleNameOutputPortAssertHelperAssert(attribute, propertiesBuilderInputPort, obj, !z2);
    }

    private void simpleAttributeOutputPortHelper(Attribute<?> attribute, String str, boolean z, Object obj, boolean z2) {
        Properties propertiesBuilderOutputPort = propertiesBuilderOutputPort(attribute.getSimpleName(), str, z, obj);
        simpleNameOutputPortAssertHelperAssert(attribute, propertiesBuilderOutputPort, obj, z2);
        simpleNameInputPortAssertHelperAssert(attribute, propertiesBuilderOutputPort, obj, !z2);
    }

    private void simpleNamePortAssertHelperAssert(Attribute<?> attribute, Properties properties, Object obj, boolean z) {
        SimpleTestApplicationWithName simpleTestApplicationWithName = new SimpleTestApplicationWithName();
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = simpleTestApplicationWithName.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, simpleTestApplicationWithName, str);
        simpleNamePortAssertHelper(attribute, logicalPlan, "operator1", obj, z);
        simpleNamePortAssertHelper(attribute, logicalPlan, "operator2", obj, z);
        simpleNamePortAssertHelper(attribute, logicalPlan, "operator3", obj, z);
    }

    private void simpleNameInputPortAssertHelperAssert(Attribute<?> attribute, Properties properties, Object obj, boolean z) {
        SimpleTestApplicationWithName simpleTestApplicationWithName = new SimpleTestApplicationWithName();
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = simpleTestApplicationWithName.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, simpleTestApplicationWithName, str);
        simpleNameInputPortAssertHelper(attribute, logicalPlan, "operator1", obj, z);
        simpleNameInputPortAssertHelper(attribute, logicalPlan, "operator2", obj, z);
        simpleNameInputPortAssertHelper(attribute, logicalPlan, "operator3", obj, z);
    }

    private void simpleNameOutputPortAssertHelperAssert(Attribute<?> attribute, Properties properties, Object obj, boolean z) {
        SimpleTestApplicationWithName simpleTestApplicationWithName = new SimpleTestApplicationWithName();
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        String str = simpleTestApplicationWithName.getClass().getName().replace(".", "/") + ".class";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlanConfiguration.prepareDAG(logicalPlan, simpleTestApplicationWithName, str);
        simpleNameOutputPortAssertHelper(attribute, logicalPlan, "operator1", obj, z);
        simpleNameOutputPortAssertHelper(attribute, logicalPlan, "operator2", obj, z);
        simpleNameOutputPortAssertHelper(attribute, logicalPlan, "operator3", obj, z);
    }

    private void simpleNamePortAssertHelper(Attribute<?> attribute, LogicalPlan logicalPlan, String str, Object obj, boolean z) {
        simpleNameInputPortAssertHelper(attribute, logicalPlan, str, obj, z);
        simpleNameOutputPortAssertHelper(attribute, logicalPlan, str, obj, z);
    }

    private void simpleNameInputPortAssertHelper(Attribute<?> attribute, LogicalPlan logicalPlan, String str, Object obj, boolean z) {
        for (LogicalPlan.InputPortMeta inputPortMeta : logicalPlan.getOperatorMeta(str).getInputStreams().keySet()) {
            if (z) {
                Assert.assertEquals(obj, inputPortMeta.getValue(attribute));
            } else {
                Assert.assertNotEquals(obj, inputPortMeta.getValue(attribute));
            }
        }
    }

    private void simpleNameOutputPortAssertHelper(Attribute<?> attribute, LogicalPlan logicalPlan, String str, Object obj, boolean z) {
        for (LogicalPlan.OutputPortMeta outputPortMeta : logicalPlan.getOperatorMeta(str).getOutputStreams().keySet()) {
            if (z) {
                Assert.assertEquals(obj, outputPortMeta.getValue(attribute));
            } else {
                Assert.assertNotEquals(obj, outputPortMeta.getValue(attribute));
            }
        }
    }

    private Properties propertiesBuilder(String str, String str2, String str3, Object obj) {
        if (!(str3 == null)) {
            str = str3 + "." + str;
        }
        Properties properties = new Properties();
        properties.put(str2 + LogicalPlanConfiguration.StramElement.ATTR.getValue() + "." + str, obj.toString());
        return properties;
    }

    private Properties propertiesBuilderOperator(String str, String str2, boolean z, Object obj, boolean z2) {
        String canonicalName = z ? null : Context.OperatorContext.class.getCanonicalName();
        if (z2) {
            str2 = str2 + "operator.*.";
        }
        return propertiesBuilder(str, str2, canonicalName, obj);
    }

    private Properties propertiesBuilderPort(String str, String str2, boolean z, Object obj, boolean z2) {
        String canonicalName = z ? null : Context.PortContext.class.getCanonicalName();
        if (z2) {
            str2 = str2 + "port.*.";
        }
        return propertiesBuilder(str, str2, canonicalName, obj);
    }

    private Properties propertiesBuilderInputPort(String str, String str2, boolean z, Object obj) {
        return propertiesBuilder(str, str2 + "inputport.*.", z ? null : Context.PortContext.class.getCanonicalName(), obj);
    }

    private Properties propertiesBuilderOutputPort(String str, String str2, boolean z, Object obj) {
        return propertiesBuilder(str, str2 + "outputport.*.", z ? null : Context.PortContext.class.getCanonicalName(), obj);
    }

    @Test
    public void testErrorSameAttrMultipleTypes() {
        new LogicalPlanConfiguration(new Configuration());
        Exception exc = null;
        try {
            LogicalPlanConfiguration.ContextUtils.buildAttributeMaps(Sets.newHashSet(new Class[]{MockContext1.class, MockContext2.class}));
        } catch (ValidationException e) {
            exc = e;
        }
        Assert.assertNotNull(exc);
        Assert.assertTrue(exc.getMessage().contains("is defined with two different types in two different context classes:"));
        LogicalPlanConfiguration.ContextUtils.initialize();
    }

    static {
        Object[] objArr = {Long.valueOf(MockContext1.serialVersionUID), Long.valueOf(MockContext2.serialVersionUID)};
        logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
        LOG = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
    }
}
