package com.datatorrent.stram.plan.logical;

import com.datatorrent.api.AffinityRule;
import com.datatorrent.api.AffinityRulesSet;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StringCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.engine.TestNonOptionalOutportInputOperator;
import com.datatorrent.stram.engine.TestOutputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Valid;
import javax.validation.Validation;
import javax.validation.ValidationException;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest.class */
public class LogicalPlanTest {

    @OperatorAnnotation(checkpointableWithinAppWindow = true)
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$CheckpointableWithinAppWindowOperator.class */
    class CheckpointableWithinAppWindowOperator extends GenericTestOperator {
        CheckpointableWithinAppWindowOperator() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$CounterOperator.class */
    public static class CounterOperator extends BaseOperator {
        public final transient Operator.InputPort<Object> countInputPort = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.CounterOperator.1
            public final void process(Object obj) {
            }
        };
    }

    @DefaultSerializer(JavaSerializer.class)
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$JdkSerializableOperator.class */
    public static class JdkSerializableOperator extends BaseOperator implements Serializable {
        private static final long serialVersionUID = -4024202339520027097L;

        @InputPortFieldAnnotation(optional = true)
        public final Operator.InputPort<Object> inport1 = new SerializableInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.JdkSerializableOperator.1
            private static final long serialVersionUID = 1;

            public final void put(Object obj) {
            }

            public int getCount(boolean z) {
                return 0;
            }
        };

        /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$JdkSerializableOperator$SerializableInputPort.class */
        public abstract class SerializableInputPort<T> implements Operator.InputPort<T>, Sink<T>, Serializable {
            private static final long serialVersionUID = 1;

            public SerializableInputPort() {
            }

            public Sink<T> getSink() {
                return this;
            }

            public void setConnected(boolean z) {
            }

            public void setup(Context.PortContext portContext) {
            }

            public void teardown() {
            }

            public StreamCodec<T> getStreamCodec() {
                return null;
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$NoInputPortOperator.class */
    class NoInputPortOperator extends BaseOperator {
        NoInputPortOperator() {
        }
    }

    @OperatorAnnotation(checkpointableWithinAppWindow = false)
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$NotCheckpointableWithinAppWindowOperator.class */
    class NotCheckpointableWithinAppWindowOperator extends GenericTestOperator {
        NotCheckpointableWithinAppWindowOperator() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$Object2String.class */
    private static class Object2String implements StringCodec<Object> {
        private Object2String() {
        }

        public Object fromString(String str) {
            return null;
        }

        public String toString(Object obj) {
            return null;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$Operator1.class */
    class Operator1 extends BaseOperator {
        public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.Operator1.1
            public void process(Object obj) {
            }
        };

        Operator1() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$Operator2.class */
    class Operator2 extends Operator1 {
        public final transient DefaultInputPort<Object> input;

        Operator2() {
            super();
            this.input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.Operator2.1
                public void process(Object obj) {
                }
            };
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$Operator3.class */
    class Operator3 extends Operator1 {

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Object> input;

        Operator3() {
            super();
            this.input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.Operator3.1
                public void process(Object obj) {
                }
            };
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$Operator4.class */
    class Operator4 extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();

        Operator4() {
        }

        public void emitTuples() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$Operator5.class */
    class Operator5 extends Operator4 {
        public final transient DefaultOutputPort<Object> output;

        Operator5() {
            super();
            this.output = new DefaultOutputPort<>();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestAnnotationsOperator.class */
    private class TestAnnotationsOperator extends BaseOperator implements InputOperator {

        @OutputPortFieldAnnotation(optional = false)
        public final transient DefaultOutputPort<Object> outport2;

        private TestAnnotationsOperator() {
            this.outport2 = new DefaultOutputPort<>();
        }

        public void emitTuples() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestAnnotationsOperator2.class */
    private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<Object> outport1;

        private TestAnnotationsOperator2() {
            this.outport1 = new DefaultOutputPort<>();
        }

        public void emitTuples() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestAnnotationsOperator3.class */
    private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator {

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Object> outport1;

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Object> outport2;

        private TestAnnotationsOperator3() {
            this.outport1 = new DefaultOutputPort<>();
            this.outport2 = new DefaultOutputPort<>();
        }

        public void emitTuples() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestAttributeValue.class */
    private static class TestAttributeValue {
        private TestAttributeValue() {
        }
    }

    @OperatorAnnotation(partitionable = false)
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestOperatorAnnotationOperator.class */
    public static class TestOperatorAnnotationOperator extends BaseOperator {

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.TestOperatorAnnotationOperator.1
            public void process(Object obj) {
            }
        };
    }

    @OperatorAnnotation(partitionable = false)
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestOperatorAnnotationOperator2.class */
    public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
        public Collection<Partitioner.Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partitioner.Partition<TestOperatorAnnotationOperator2>> collection, Partitioner.PartitioningContext partitioningContext) {
            return null;
        }

        public void partitioned(Map<Integer, Partitioner.Partition<TestOperatorAnnotationOperator2>> map) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestPortCodecOperator.class */
    public static class TestPortCodecOperator extends BaseOperator {
        public final transient DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.LogicalPlanTest.TestPortCodecOperator.1
            public void process(Object obj) {
            }

            public StreamCodec<Object> getStreamCodec() {
                return new TestStreamCodec();
            }
        };

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$TestStreamCodec.class */
    private static class TestStreamCodec implements StreamCodec<Object> {
        private TestStreamCodec() {
        }

        public Object fromByteArray(Slice slice) {
            return slice.stringValue();
        }

        public Slice toByteArray(Object obj) {
            byte[] bytes = obj.toString().getBytes();
            return new Slice(bytes, 0, bytes.length);
        }

        public int getPartition(Object obj) {
            return obj.hashCode() / 2;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$ValidationOperator.class */
    public static class ValidationOperator extends BaseOperator {
        public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<>();
        public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<>();
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$ValidationTestOperator.class */
    public static class ValidationTestOperator extends BaseOperator implements InputOperator {

        @NotNull
        @Pattern(regexp = ".*malhar.*", message = "Value has to contain 'malhar'!")
        private String stringField1;

        @Min(2)
        private int intField1;
        private String[] stringArrayField;
        private String stringProperty2;
        private String getterProperty2 = "";

        @Valid
        private final Nested nestedBean = new Nested();
        private Map<String, String> mapProperty = Maps.newHashMap();

        /* loaded from: input_file:com/datatorrent/stram/plan/logical/LogicalPlanTest$ValidationTestOperator$Nested.class */
        public class Nested {

            @NotNull
            private String property = "";

            public Nested() {
            }

            public String getProperty() {
                return this.property;
            }

            public void setProperty(String str) {
                this.property = str;
            }
        }

        @AssertTrue(message = "stringField1 should end with intField1")
        private boolean isValidConfiguration() {
            return this.stringField1.endsWith(String.valueOf(this.intField1));
        }

        @NotNull
        public String getProperty2() {
            return this.getterProperty2;
        }

        public void setProperty2(String str) {
            this.getterProperty2 = str;
        }

        public String[] getStringArrayField() {
            return this.stringArrayField;
        }

        public void setStringArrayField(String[] strArr) {
            this.stringArrayField = strArr;
        }

        public String getStringProperty2() {
            return this.stringProperty2;
        }

        public void setStringProperty2(String str) {
            this.stringProperty2 = str;
        }

        public Map<String, String> getMapProperty() {
            return this.mapProperty;
        }

        public void setMapProperty(Map<String, String> map) {
            this.mapProperty = map;
        }

        public void emitTuples() {
        }
    }

    @Test
    public void testCycleDetection() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("operator2", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("operator3", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("operator4", GenericTestOperator.class);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("operator7", GenericTestOperator.class);
        logicalPlan.addStream("n2n3", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("n3n4", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.addStream("n4n2", addOperator3.outport1, addOperator.inport1);
        try {
            logicalPlan.addStream("n7n7", addOperator4.outport1, addOperator4.inport1).addSink(addOperator4.inport1);
            Assert.fail("cannot add to stream again");
        } catch (Exception e) {
        }
        LogicalPlan.ValidationContext validationContext = new LogicalPlan.ValidationContext();
        logicalPlan.findStronglyConnected(logicalPlan.getMeta(addOperator4), validationContext);
        Assert.assertEquals("operator self reference", 1L, validationContext.invalidCycles.size());
        Assert.assertEquals("operator self reference", 1L, ((Set) validationContext.invalidCycles.get(0)).size());
        Assert.assertEquals("operator self reference", logicalPlan.getMeta(addOperator4), ((Set) validationContext.invalidCycles.get(0)).iterator().next());
        LogicalPlan.ValidationContext validationContext2 = new LogicalPlan.ValidationContext();
        logicalPlan.findStronglyConnected(logicalPlan.getMeta(addOperator3), validationContext2);
        Assert.assertEquals("3 operator cycle", 1L, validationContext2.invalidCycles.size());
        Assert.assertEquals("3 operator cycle", 3L, ((Set) validationContext2.invalidCycles.get(0)).size());
        Assert.assertTrue("operator2", ((Set) validationContext2.invalidCycles.get(0)).contains(logicalPlan.getMeta(addOperator)));
        Assert.assertTrue("operator3", ((Set) validationContext2.invalidCycles.get(0)).contains(logicalPlan.getMeta(addOperator2)));
        Assert.assertTrue("operator4", ((Set) validationContext2.invalidCycles.get(0)).contains(logicalPlan.getMeta(addOperator3)));
        try {
            logicalPlan.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e2) {
        }
    }

    @Test
    public void testCycleDetectionWithDelay() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator5 = logicalPlan.addOperator("opDelay", new DefaultDelayOperator());
        DefaultDelayOperator addOperator6 = logicalPlan.addOperator("opDelay2", new DefaultDelayOperator());
        logicalPlan.addStream("AtoB", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("BtoC", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.addStream("CtoD", addOperator3.outport1, addOperator4.inport1);
        logicalPlan.addStream("CtoDelay", addOperator3.outport2, addOperator5.input);
        logicalPlan.addStream("DtoDelay", addOperator4.outport1, addOperator6.input);
        logicalPlan.addStream("DelayToB", addOperator5.output, addOperator2.inport2);
        logicalPlan.addStream("Delay2ToC", addOperator6.output, addOperator3.inport2);
        LogicalPlan.ValidationContext validationContext = new LogicalPlan.ValidationContext();
        logicalPlan.findStronglyConnected(logicalPlan.getMeta(addOperator), validationContext);
        Assert.assertEquals("No invalid cycle", Collections.emptyList(), validationContext.invalidCycles);
        Assert.assertEquals("cycle", Sets.newHashSet(new LogicalPlan.OperatorMeta[]{logicalPlan.getMeta(addOperator6), logicalPlan.getMeta(addOperator5), logicalPlan.getMeta(addOperator3), logicalPlan.getMeta(addOperator2), logicalPlan.getMeta(addOperator4)}), validationContext.stronglyConnected.get(0));
    }

    @Test
    public void testLogicalPlanSerialization() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        ValidationOperator addOperator = logicalPlan.addOperator("validationNode", ValidationOperator.class);
        CounterOperator addOperator2 = logicalPlan.addOperator("countGoodNode", CounterOperator.class);
        CounterOperator addOperator3 = logicalPlan.addOperator("countBadNode", CounterOperator.class);
        logicalPlan.addStream("goodTuplesStream", addOperator.goodOutputPort, addOperator2.countInputPort);
        logicalPlan.addStream("badTuplesStream", addOperator.badOutputPort, addOperator3.countInputPort);
        Assert.assertEquals("number root operators", 1L, logicalPlan.getRootOperators().size());
        Assert.assertEquals("root operator id", "validationNode", ((LogicalPlan.OperatorMeta) logicalPlan.getRootOperators().get(0)).getName());
        logicalPlan.getContextAttributes(addOperator2).put(Context.OperatorContext.SPIN_MILLIS, 10);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        LogicalPlan.write(logicalPlan, byteArrayOutputStream);
        LogicalPlan read = LogicalPlan.read(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
        Assert.assertNotNull(read);
        Assert.assertEquals("number operators in clone", logicalPlan.getAllOperators().size(), read.getAllOperators().size());
        Assert.assertEquals("number root operators in clone", 1L, read.getRootOperators().size());
        Assert.assertTrue("root operator in operators", read.getAllOperators().contains(read.getRootOperators().get(0)));
        Assert.assertEquals("", new Integer(10), read.getContextAttributes(read.getOperatorMeta("countGoodNode").getOperator()).get(Context.OperatorContext.SPIN_MILLIS));
    }

    @Test
    public void testDeleteOperator() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.addStream("s0", addOperator.outport, addOperator2.inport1);
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("s1", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.validate();
        Assert.assertEquals("", 3L, logicalPlan.getAllOperators().size());
        logicalPlan.removeOperator(addOperator3);
        addStream.remove();
        logicalPlan.validate();
        Assert.assertEquals("", 2L, logicalPlan.getAllOperators().size());
    }

    @Test
    public void testOperatorValidation() {
        ValidationTestOperator validationTestOperator = new ValidationTestOperator();
        validationTestOperator.stringField1 = "malhar1";
        validationTestOperator.intField1 = 1;
        Set validate = Validation.buildDefaultValidatorFactory().getValidator().validate(validationTestOperator, new Class[0]);
        Assert.assertEquals("" + validate, 1L, validate.size());
        ConstraintViolation constraintViolation = (ConstraintViolation) validate.iterator().next();
        Assert.assertEquals("", Integer.valueOf(validationTestOperator.intField1), constraintViolation.getInvalidValue());
        Assert.assertEquals("", "intField1", constraintViolation.getPropertyPath().toString());
        LogicalPlan logicalPlan = new LogicalPlan();
        ValidationTestOperator addOperator = logicalPlan.addOperator("testOperator", validationTestOperator);
        try {
            logicalPlan.validate();
            Assert.fail("should throw ConstraintViolationException");
        } catch (ConstraintViolationException e) {
            Assert.assertEquals("violation details", validate, e.getConstraintViolations());
            Assert.assertThat("exception message", e.getMessage(), StramTestSupport.RegexMatcher.matches(".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]"));
        }
        try {
            addOperator.intField1 = 3;
            logicalPlan.validate();
            Assert.fail("should throw ConstraintViolationException");
        } catch (ConstraintViolationException e2) {
            ConstraintViolation constraintViolation2 = (ConstraintViolation) e2.getConstraintViolations().iterator().next();
            Assert.assertEquals("" + e2.getConstraintViolations(), 1L, validate.size());
            Assert.assertEquals("", false, constraintViolation2.getInvalidValue());
            Assert.assertEquals("", "validConfiguration", constraintViolation2.getPropertyPath().toString());
        }
        addOperator.stringField1 = "malhar3";
        try {
            addOperator.getterProperty2 = null;
            logicalPlan.validate();
            Assert.fail("should throw ConstraintViolationException");
        } catch (ConstraintViolationException e3) {
            ConstraintViolation constraintViolation3 = (ConstraintViolation) e3.getConstraintViolations().iterator().next();
            Assert.assertEquals("" + e3.getConstraintViolations(), 1L, validate.size());
            Assert.assertEquals("", (Object) null, constraintViolation3.getInvalidValue());
            Assert.assertEquals("", "property2", constraintViolation3.getPropertyPath().toString());
        }
        addOperator.getterProperty2 = "";
        try {
            addOperator.nestedBean.property = null;
            logicalPlan.validate();
            Assert.fail("should throw ConstraintViolationException");
        } catch (ConstraintViolationException e4) {
            ConstraintViolation constraintViolation4 = (ConstraintViolation) e4.getConstraintViolations().iterator().next();
            Assert.assertEquals("" + e4.getConstraintViolations(), 1L, validate.size());
            Assert.assertEquals("", (Object) null, constraintViolation4.getInvalidValue());
            Assert.assertEquals("", "nestedBean.property", constraintViolation4.getPropertyPath().toString());
        }
        addOperator.nestedBean.property = "";
        logicalPlan.validate();
    }

    @Test
    public void testValidationForNonInputRootOperator() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addOperator("x", new NoInputPortOperator());
        try {
            logicalPlan.validate();
            Assert.fail("should fail because root operator is not input operator");
        } catch (ValidationException e) {
        }
    }

    @Test
    public void testOperatorAnnotation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        TestOperatorAnnotationOperator addOperator2 = logicalPlan.addOperator("operator1", TestOperatorAnnotationOperator.class);
        logicalPlan.addStream("Connection", addOperator.outport, addOperator2.input1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        try {
            logicalPlan.validate();
            Assert.fail("should raise operator is not partitionable for operator1");
        } catch (ValidationException e) {
            Assert.assertEquals("", "Operator " + logicalPlan.getMeta(addOperator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
        }
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, (Object) null);
        logicalPlan.setInputPortAttribute(addOperator2.input1, Context.PortContext.PARTITION_PARALLEL, true);
        try {
            logicalPlan.validate();
            Assert.fail("should raise operator is not partitionable for operator1");
        } catch (ValidationException e2) {
            Assert.assertEquals("", "Operator " + logicalPlan.getMeta(addOperator2).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e2.getMessage());
        }
        logicalPlan.setInputPortAttribute(addOperator2.input1, Context.PortContext.PARTITION_PARALLEL, false);
        logicalPlan.validate();
        logicalPlan.removeOperator(addOperator2);
        TestOperatorAnnotationOperator2 addOperator3 = logicalPlan.addOperator("operator2", TestOperatorAnnotationOperator2.class);
        try {
            logicalPlan.validate();
            Assert.fail("should raise operator is not partitionable for operator2");
        } catch (ValidationException e3) {
            Assert.assertEquals("Operator " + logicalPlan.getMeta(addOperator3).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e3.getMessage());
        }
    }

    @Test
    public void testPortConnectionValidation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestNonOptionalOutportInputOperator addOperator = logicalPlan.addOperator("input1", TestNonOptionalOutportInputOperator.class);
        try {
            logicalPlan.validate();
            Assert.fail("should raise port not connected for input1.outputPort1");
        } catch (ValidationException e) {
            Assert.assertEquals("", "Output port connection required: input1.outport1", e.getMessage());
        }
        logicalPlan.addStream("stream1", addOperator.outport1, logicalPlan.addOperator("o1", GenericTestOperator.class).inport1);
        logicalPlan.validate();
        logicalPlan.addOperator("counter", CounterOperator.class);
        try {
            logicalPlan.validate();
        } catch (ValidationException e2) {
            Assert.assertEquals("", "Input port connection required: counter.countInputPort", e2.getMessage());
        }
    }

    @Test
    public void testAtMostOnceProcessingModeValidation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        TestGeneratorInputOperator addOperator2 = logicalPlan.addOperator("input2", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("amoOper", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator3, Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
        logicalPlan.addStream("input1.outport", addOperator.outport, addOperator3.inport1);
        logicalPlan.addStream("input2.outport", addOperator2.outport, addOperator3.inport2);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("outputOper", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
        logicalPlan.addStream("aloOper.outport1", addOperator3.outport1, addOperator4.inport1);
        try {
            logicalPlan.validate();
            Assert.fail("Exception expected for " + addOperator4);
        } catch (ValidationException e) {
            Assert.assertEquals("", e.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/AT_MOST_ONCE");
        }
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.PROCESSING_MODE, (Object) null);
        logicalPlan.validate();
        LogicalPlan.OperatorMeta meta = logicalPlan.getMeta(addOperator4);
        Assert.assertEquals("" + meta.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, meta.getValue(Context.OperatorContext.PROCESSING_MODE));
    }

    @Test
    public void testExactlyOnceProcessingModeValidation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        TestGeneratorInputOperator addOperator2 = logicalPlan.addOperator("input2", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("amoOper", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator3, Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.EXACTLY_ONCE);
        logicalPlan.addStream("input1.outport", addOperator.outport, addOperator3.inport1);
        logicalPlan.addStream("input2.outport", addOperator2.outport, addOperator3.inport2);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("outputOper", GenericTestOperator.class);
        logicalPlan.addStream("aloOper.outport1", addOperator3.outport1, addOperator4.inport1);
        try {
            logicalPlan.validate();
            Assert.fail("Exception expected for " + addOperator4);
        } catch (ValidationException e) {
            Assert.assertEquals("", e.getMessage(), "Processing mode for outputOper should be AT_MOST_ONCE for source amoOper/EXACTLY_ONCE");
        }
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
        try {
            logicalPlan.validate();
            Assert.fail("Exception expected for " + addOperator4);
        } catch (ValidationException e2) {
            Assert.assertEquals("", e2.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/EXACTLY_ONCE");
        }
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
        logicalPlan.validate();
    }

    @Test
    public void testLocalityValidation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o1", GenericTestOperator.class);
        LogicalPlan.StreamMeta locality = logicalPlan.addStream("input1.outport", addOperator.outport, addOperator2.inport1).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        logicalPlan.addStream("input2.outport", logicalPlan.addOperator("input2", TestGeneratorInputOperator.class).outport, addOperator2.inport2);
        try {
            logicalPlan.validate();
            Assert.fail("Exception expected for " + addOperator2);
        } catch (ValidationException e) {
            Assert.assertThat("", e.getMessage(), StramTestSupport.RegexMatcher.matches("Locality THREAD_LOCAL invalid for operator .* with multiple input streams .*"));
        }
        locality.setLocality((DAG.Locality) null);
        logicalPlan.validate();
    }

    @Test
    public void testOutputPortAnnotation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestAnnotationsOperator addOperator = logicalPlan.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
        try {
            logicalPlan.validate();
            Assert.fail("should raise: port connection required");
        } catch (ValidationException e) {
            Assert.assertEquals("", "Output port connection required: testAnnotationsOperator.outport2", e.getMessage());
        }
        logicalPlan.addStream("s1", addOperator.outport2, logicalPlan.addOperator("sink", new TestOutputOperator()).inport);
        logicalPlan.validate();
        TestAnnotationsOperator2 addOperator2 = logicalPlan.addOperator("multiOutputPorts1", new TestAnnotationsOperator2());
        try {
            logicalPlan.validate();
            Assert.fail("should raise: At least one output port must be connected");
        } catch (ValidationException e2) {
            Assert.assertEquals("", "At least one output port must be connected: multiOutputPorts1", e2.getMessage());
        }
        logicalPlan.addStream("s2", addOperator2.outport1, logicalPlan.addOperator("o3", new TestOutputOperator()).inport);
        logicalPlan.addOperator("multiOutputPorts3", new TestAnnotationsOperator3());
        logicalPlan.validate();
    }

    @Test
    public void testJdkSerializableOperator() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addOperator("o1", new JdkSerializableOperator());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        LogicalPlan.write(logicalPlan, byteArrayOutputStream);
        byteArrayOutputStream.close();
        Assert.assertNotNull("port object null", LogicalPlan.read(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())).getOperatorMeta("o1").getOperator().inport1);
    }

    @Test
    public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
        LogicalPlan logicalPlan = new LogicalPlan();
        Attribute attribute = new Attribute(new TestAttributeValue(), new Object2String());
        Field declaredField = Attribute.class.getDeclaredField("name");
        declaredField.setAccessible(true);
        declaredField.set(attribute, "Test_Attribute");
        declaredField.setAccessible(false);
        Assert.assertNotNull(attribute);
        logicalPlan.setAttribute(attribute, new TestAttributeValue());
        try {
            logicalPlan.validate();
            Assert.fail("Setting not serializable attribute should throw exception");
        } catch (ValidationException e) {
            Assert.assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in com.datatorrent.api.DAG are not serializable", e.getMessage());
        }
        LogicalPlan logicalPlan2 = new LogicalPlan();
        logicalPlan2.setAttribute(logicalPlan2.addOperator("TestOperator", TestGeneratorInputOperator.class), attribute, new TestAttributeValue());
        try {
            logicalPlan2.validate();
            Assert.fail("Setting not serializable attribute should throw exception");
        } catch (ValidationException e2) {
            Assert.assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator are not serializable", e2.getMessage());
        }
        LogicalPlan logicalPlan3 = new LogicalPlan();
        logicalPlan3.setOutputPortAttribute(logicalPlan3.addOperator("TestOperator", TestGeneratorInputOperator.class).outport, attribute, new TestAttributeValue());
        try {
            logicalPlan3.validate();
            Assert.fail("Setting not serializable attribute should throw exception");
        } catch (ValidationException e3) {
            Assert.assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.outport are not serializable", e3.getMessage());
        }
        LogicalPlan logicalPlan4 = new LogicalPlan();
        logicalPlan4.setInputPortAttribute(logicalPlan4.addOperator("TestOperator", GenericTestOperator.class).inport1, attribute, new TestAttributeValue());
        try {
            logicalPlan4.validate();
            Assert.fail("Setting non serializable attribute should throw exception");
        } catch (ValidationException e4) {
            Assert.assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.inport1 are not serializable", e4.getMessage());
        }
    }

    @Test
    public void testCheckpointableWithinAppWindowAnnotation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x", new GenericTestOperator());
        logicalPlan.addStream("Stream1", addOperator.outport, addOperator2.inport1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 30);
        logicalPlan.validate();
        TestGeneratorInputOperator addOperator3 = logicalPlan.addOperator("input2", TestGeneratorInputOperator.class);
        CheckpointableWithinAppWindowOperator addOperator4 = logicalPlan.addOperator("y", new CheckpointableWithinAppWindowOperator());
        logicalPlan.addStream("Stream2", addOperator3.outport, addOperator4.inport1);
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
        logicalPlan.setAttribute(addOperator4, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 30);
        logicalPlan.validate();
        TestGeneratorInputOperator addOperator5 = logicalPlan.addOperator("input3", TestGeneratorInputOperator.class);
        NotCheckpointableWithinAppWindowOperator addOperator6 = logicalPlan.addOperator("z", new NotCheckpointableWithinAppWindowOperator());
        logicalPlan.addStream("Stream3", addOperator5.outport, addOperator6.inport1);
        logicalPlan.setAttribute(addOperator6, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
        logicalPlan.setAttribute(addOperator6, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 30);
        try {
            logicalPlan.validate();
            Assert.fail("should fail because chekpoint window count is not a factor of application window count");
        } catch (ValidationException e) {
        }
        logicalPlan.setAttribute(addOperator6, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 30);
        logicalPlan.validate();
        logicalPlan.setAttribute(addOperator6, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 45);
        try {
            logicalPlan.validate();
            Assert.fail("should fail because chekpoint window count is not a factor of application window count");
        } catch (ValidationException e2) {
        }
    }

    @Test
    public void testInputPortHiding() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("Stream1", logicalPlan.addOperator("input1", TestGeneratorInputOperator.class).outport, logicalPlan.addOperator("operator2", new Operator2()).input);
        logicalPlan.validate();
    }

    @Test
    public void testInvalidInputPortConnection() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("Stream1", logicalPlan.addOperator("input1", TestGeneratorInputOperator.class).outport, logicalPlan.addOperator("operator3", new Operator3()).input);
        try {
            logicalPlan.validate();
            Assert.fail();
        } catch (ValidationException e) {
            Assert.assertTrue("validation message", e.getMessage().startsWith("Invalid port connected"));
        }
    }

    @Test
    public void testAffinityRulesDagValidation() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("O1", new TestGeneratorInputOperator());
        GenericTestOperator addOperator2 = logicalPlan.addOperator("O2", new GenericTestOperator());
        GenericTestOperator addOperator3 = logicalPlan.addOperator("O3", new GenericTestOperator());
        logicalPlan.addStream("stream1", addOperator.outport, addOperator2.inport1).setLocality(DAG.Locality.THREAD_LOCAL);
        LogicalPlan.StreamMeta locality = logicalPlan.addStream("stream2", addOperator2.outport1, addOperator3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        AffinityRulesSet affinityRulesSet = new AffinityRulesSet();
        ArrayList arrayList = new ArrayList();
        affinityRulesSet.setAffinityRules(arrayList);
        AffinityRule affinityRule = new AffinityRule(AffinityRule.Type.AFFINITY, DAG.Locality.CONTAINER_LOCAL, false, "O1", new String[]{"O3"});
        arrayList.add(affinityRule);
        logicalPlan.setAttribute(Context.DAGContext.AFFINITY_RULES_SET, affinityRulesSet);
        logicalPlan.validate();
        AffinityRule affinityRule2 = new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "O2", new String[]{"O3"});
        arrayList.add(affinityRule2);
        try {
            logicalPlan.validate();
            Assert.fail("DAG validation should fail due to conflicting rules");
        } catch (ValidationException e) {
            Assert.assertEquals("Anti Affinity rule for operators O2 & O3 conflicts with affinity rules or Stream locality", e.getMessage());
        }
        locality.setLocality(DAG.Locality.RACK_LOCAL);
        logicalPlan.validate();
        AffinityRule affinityRule3 = new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "O1", new String[]{"O3"});
        arrayList.add(affinityRule3);
        try {
            logicalPlan.validate();
            Assert.fail("DAG validation should fail due to conflicting rules");
        } catch (ValidationException e2) {
            Assert.assertEquals("Anti Affinity rule for operators O1 & O3 conflicts with affinity rules or Stream locality", e2.getMessage());
        }
        arrayList.clear();
        affinityRule.setLocality(DAG.Locality.RACK_LOCAL);
        arrayList.add(affinityRule);
        arrayList.add(affinityRule2);
        arrayList.add(affinityRule3);
        logicalPlan.validate();
        arrayList.add(new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, true, "O1", new String[]{"O2"}));
        logicalPlan.validate();
        arrayList.clear();
        arrayList.add(new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "O2", new String[]{"O3"}));
        logicalPlan.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host1");
        logicalPlan.getMeta(addOperator3).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host1");
        try {
            logicalPlan.validate();
            Assert.fail("DAG validation should fail due to conflicting host locality");
        } catch (ValidationException e3) {
            Assert.assertEquals("Host Locality for operators: O2(host: host1) & O3(host: host1) conflict with anti-affinity rules", e3.getMessage());
        }
        arrayList.clear();
        arrayList.add(new AffinityRule(AffinityRule.Type.AFFINITY, DAG.Locality.NODE_LOCAL, false, "O2", new String[]{"O3"}));
        logicalPlan.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host1");
        logicalPlan.getMeta(addOperator3).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host2");
        try {
            logicalPlan.validate();
            Assert.fail("DAG validation should fail due to conflicting host locality");
        } catch (ValidationException e4) {
            Assert.assertEquals("Host Locality for operators: O2(host: host1) & O3(host: host2) conflicts with affinity rules", e4.getMessage());
        }
        ((AffinityRulesSet) logicalPlan.getAttributes().get(Context.DAGContext.AFFINITY_RULES_SET)).getAffinityRules().clear();
        arrayList.add(new AffinityRule(AffinityRule.Type.AFFINITY, DAG.Locality.THREAD_LOCAL, false, "O1", new String[]{"O3"}));
        try {
            logicalPlan.validate();
            Assert.fail("DAG validation should fail due to conflicting host locality");
        } catch (ValidationException e5) {
            Assert.assertEquals("Affinity rule specified THREAD_LOCAL affinity for operators O1 & O3 which are not connected by stream", e5.getMessage());
        }
        LogicalPlan logicalPlan2 = new LogicalPlan();
        TestGeneratorInputOperator addOperator4 = logicalPlan2.addOperator("O1", new TestGeneratorInputOperator());
        GenericTestOperator addOperator5 = logicalPlan2.addOperator("O2", new GenericTestOperator());
        GenericTestOperator addOperator6 = logicalPlan2.addOperator("O3", new GenericTestOperator());
        GenericTestOperator addOperator7 = logicalPlan2.addOperator("O4", new GenericTestOperator());
        GenericTestOperator addOperator8 = logicalPlan2.addOperator("O5", new GenericTestOperator());
        logicalPlan2.addStream("stream1", addOperator4.outport, addOperator5.inport1, addOperator6.inport1).setLocality(DAG.Locality.NODE_LOCAL);
        logicalPlan2.addStream("stream2", addOperator6.outport1, addOperator7.inport1);
        logicalPlan2.addStream("stream3", addOperator5.outport1, addOperator8.inport1);
        arrayList.clear();
        arrayList.add(new AffinityRule(AffinityRule.Type.AFFINITY, DAG.Locality.CONTAINER_LOCAL, false, "O1", new String[]{"O5"}));
        arrayList.add(new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "O3", new String[]{"O5"}));
        AffinityRulesSet affinityRulesSet2 = new AffinityRulesSet();
        affinityRulesSet2.setAffinityRules(arrayList);
        logicalPlan2.setAttribute(Context.DAGContext.AFFINITY_RULES_SET, affinityRulesSet2);
        try {
            logicalPlan2.validate();
            Assert.fail("dag validation should fail due to conflicting affinity rules");
        } catch (ValidationException e6) {
            Assert.assertEquals("Anti Affinity rule for operators O3 & O5 conflicts with affinity rules or Stream locality", e6.getMessage());
        }
    }

    @Test
    public void testOutputPortHiding() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("Stream1", logicalPlan.addOperator("input", new Operator5()).output, logicalPlan.addOperator("operator2", new Operator2()).input);
        logicalPlan.validate();
    }

    @Test(expected = ValidationException.class)
    public void testInvalidOutputPortConnection() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("Stream1", logicalPlan.addOperator("input", new Operator5()).output, logicalPlan.addOperator("operator3", new Operator3()).input);
        logicalPlan.validate();
    }
}
