package com.datatorrent.stram.stream;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.engine.GenericNodeTest;
import com.datatorrent.stram.engine.ProcessingModeTests;
import com.datatorrent.stram.engine.RecoverableInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import javax.validation.ConstraintViolationException;
import javax.validation.ValidationException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/stream/OiOStreamTest.class */
public class OiOStreamTest {
    private static final Logger logger = LoggerFactory.getLogger(OiOStreamTest.class);

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOStreamTest$ThreadIdValidatingGenericIntermediateOperator.class */
    public static class ThreadIdValidatingGenericIntermediateOperator implements Operator {
        public static long threadId;
        public static List<Long> threadList;
        public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { // from class: com.datatorrent.stram.stream.OiOStreamTest.ThreadIdValidatingGenericIntermediateOperator.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void process(Number number) {
                if (!$assertionsDisabled && !ThreadIdValidatingGenericIntermediateOperator.threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                    throw new AssertionError();
                }
            }

            static {
                $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
            }
        };
        public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
        static final /* synthetic */ boolean $assertionsDisabled;

        public void beginWindow(long j) {
            if (!$assertionsDisabled && !threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                throw new AssertionError();
            }
        }

        public void endWindow() {
            if (!$assertionsDisabled && !threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                throw new AssertionError();
            }
        }

        public void setup(Context.OperatorContext operatorContext) {
            threadId = Thread.currentThread().getId();
            threadList.add(Long.valueOf(Thread.currentThread().getId()));
        }

        public void teardown() {
            if (!$assertionsDisabled && !threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
            threadList = Collections.synchronizedList(new ArrayList());
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOStreamTest$ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.class */
    public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator {
        public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<>();
    }

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOStreamTest$ThreadIdValidatingGenericOperatorWithTwoInputPorts.class */
    public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator {
        public static long threadId;
        public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { // from class: com.datatorrent.stram.stream.OiOStreamTest.ThreadIdValidatingGenericOperatorWithTwoInputPorts.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void process(Number number) {
                if (!$assertionsDisabled && ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId != Thread.currentThread().getId()) {
                    throw new AssertionError();
                }
            }

            static {
                $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
            }
        };
        public final transient DefaultInputPort<Number> input2 = new DefaultInputPort<Number>() { // from class: com.datatorrent.stram.stream.OiOStreamTest.ThreadIdValidatingGenericOperatorWithTwoInputPorts.2
            static final /* synthetic */ boolean $assertionsDisabled;

            public void process(Number number) {
                if (!$assertionsDisabled && ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId != Thread.currentThread().getId()) {
                    throw new AssertionError();
                }
            }

            static {
                $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
            }
        };
        static final /* synthetic */ boolean $assertionsDisabled;

        public void beginWindow(long j) {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
        }

        public void endWindow() {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
        }

        public void setup(Context.OperatorContext operatorContext) {
            threadId = Thread.currentThread().getId();
        }

        public void teardown() {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOStreamTest$ThreadIdValidatingInputOperator.class */
    public static class ThreadIdValidatingInputOperator implements InputOperator {
        public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
        public static long threadId;
        static final /* synthetic */ boolean $assertionsDisabled;

        public void emitTuples() {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
        }

        public void beginWindow(long j) {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
        }

        public void endWindow() {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
            BaseOperator.shutdown();
        }

        public void setup(Context.OperatorContext operatorContext) {
            threadId = Thread.currentThread().getId();
        }

        public void teardown() {
            if (!$assertionsDisabled && threadId != Thread.currentThread().getId()) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/stream/OiOStreamTest$ThreadIdValidatingOutputOperator.class */
    public static class ThreadIdValidatingOutputOperator implements Operator {
        public static long threadId;
        public static List<Long> threadList;
        public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { // from class: com.datatorrent.stram.stream.OiOStreamTest.ThreadIdValidatingOutputOperator.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void process(Number number) {
                if (!$assertionsDisabled && !ThreadIdValidatingOutputOperator.threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                    throw new AssertionError();
                }
            }

            static {
                $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
            }
        };
        static final /* synthetic */ boolean $assertionsDisabled;

        public void beginWindow(long j) {
            if (!$assertionsDisabled && !threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                throw new AssertionError();
            }
        }

        public void endWindow() {
            if (!$assertionsDisabled && !threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                throw new AssertionError();
            }
        }

        public void setup(Context.OperatorContext operatorContext) {
            threadId = Thread.currentThread().getId();
            threadList.add(Long.valueOf(Thread.currentThread().getId()));
        }

        public void teardown() {
            if (!$assertionsDisabled && !threadList.contains(Long.valueOf(Thread.currentThread().getId()))) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !OiOStreamTest.class.desiredAssertionStatus();
            threadList = Collections.synchronizedList(new ArrayList());
        }
    }

    @Test
    public void validatePositiveOiO() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("PossibleOiO", logicalPlan.addOperator("IntegerGenerator", new RecoverableInputOperator()).output, logicalPlan.addOperator("IntegerCollector", new ProcessingModeTests.CollectorOperator()).input).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.assertTrue("OiO validation", true);
        } catch (ConstraintViolationException e) {
            Assert.fail("OIO Single InputPort");
        }
    }

    @Test
    public void validatePositiveOiOiO() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingOutputOperator threadIdValidatingOutputOperator = (ThreadIdValidatingOutputOperator) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingOutputOperator());
        logicalPlan.addStream("OiO1", addOperator.output, threadIdValidatingGenericIntermediateOperator.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiO2", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingOutputOperator.input).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.assertTrue("OiOiO validation", true);
        } catch (ConstraintViolationException e) {
            Assert.fail("OiOiO validation");
        }
    }

    @Test
    public void validatePositiveOiOOptionalInput() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.addStream("OiO1", logicalPlan.addOperator("InputOperator1", new RecoverableInputOperator()).output, ((GenericNodeTest.GenericOperator) logicalPlan.addOperator("GenericOperator", new GenericNodeTest.GenericOperator())).ip1).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.assertTrue("OiO validation", true);
        } catch (ConstraintViolationException e) {
            Assert.fail("OiO Single Connected InputPort");
        }
    }

    @Test
    public void validateNegativeOiO() {
        LogicalPlan logicalPlan = new LogicalPlan();
        RecoverableInputOperator addOperator = logicalPlan.addOperator("InputOperator1", new RecoverableInputOperator());
        RecoverableInputOperator addOperator2 = logicalPlan.addOperator("InputOperator2", new RecoverableInputOperator());
        GenericNodeTest.GenericOperator genericOperator = (GenericNodeTest.GenericOperator) logicalPlan.addOperator("GenericOperator", new GenericNodeTest.GenericOperator());
        LogicalPlan.StreamMeta locality = logicalPlan.addStream("OiO1", addOperator.output, genericOperator.ip1).setLocality(DAG.Locality.THREAD_LOCAL);
        LogicalPlan.StreamMeta locality2 = logicalPlan.addStream("OiO2", addOperator2.output, genericOperator.ip2).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.fail("OIO Both InputPorts");
        } catch (ConstraintViolationException e) {
            Assert.assertTrue("OiO validation passed", true);
        } catch (ValidationException e2) {
            Assert.assertTrue("OiO validation passed", true);
        }
        locality.setLocality((DAG.Locality) null);
        try {
            logicalPlan.validate();
            Assert.fail("OIO First InputPort");
        } catch (ConstraintViolationException e3) {
            Assert.assertTrue("OiO validation passed", true);
        } catch (ValidationException e4) {
            Assert.assertTrue("OiO validation passed", true);
        }
        locality.setLocality(DAG.Locality.THREAD_LOCAL);
        locality2.setLocality((DAG.Locality) null);
        try {
            logicalPlan.validate();
            Assert.fail("OIO Second InputPort");
        } catch (ValidationException e5) {
            Assert.assertTrue("OiO validation passed", true);
        } catch (ConstraintViolationException e6) {
            Assert.assertTrue("OiO validation passed", true);
        }
    }

    @Test
    public void validatePositiveOiOiOdiamond() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        logicalPlan.addStream("OiOin", addOperator.output, threadIdValidatingGenericIntermediateOperator.input, threadIdValidatingGenericIntermediateOperator2.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout1", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout2", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input2).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.assertTrue("OiOiO diamond validation", true);
        } catch (ConstraintViolationException e) {
            Assert.fail("OIOIO diamond validation");
        }
    }

    @Test
    public void validatePositiveOiOiOdiamondWithCores() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator3 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator4 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        logicalPlan.addStream("OiOin", addOperator.output, threadIdValidatingGenericIntermediateOperator.input, threadIdValidatingGenericIntermediateOperator3.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOIntermediate1", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericIntermediateOperator2.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOIntermediate2", threadIdValidatingGenericIntermediateOperator3.output, threadIdValidatingGenericIntermediateOperator4.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout1", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout2", threadIdValidatingGenericIntermediateOperator4.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input2).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.setOperatorAttribute(addOperator, Context.OperatorContext.VCORES, 1);
        logicalPlan.setOperatorAttribute(threadIdValidatingGenericIntermediateOperator, Context.OperatorContext.VCORES, 1);
        logicalPlan.setOperatorAttribute(threadIdValidatingGenericIntermediateOperator2, Context.OperatorContext.VCORES, 2);
        logicalPlan.setOperatorAttribute(threadIdValidatingGenericIntermediateOperator3, Context.OperatorContext.VCORES, 3);
        logicalPlan.setOperatorAttribute(threadIdValidatingGenericIntermediateOperator4, Context.OperatorContext.VCORES, 5);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        try {
            logicalPlan.validate();
            Assert.assertTrue("OiOiO extended diamond validation", true);
        } catch (ConstraintViolationException e) {
            Assert.fail("OIOIO extended diamond validation");
        }
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, new TestPlanContext());
        Assert.assertTrue("number of containers", 1 == physicalPlan.getContainers().size());
        Assert.assertTrue("number of vcores " + ((PTContainer) physicalPlan.getContainers().get(0)).getRequiredVCores(), 5 == ((PTContainer) physicalPlan.getContainers().get(0)).getRequiredVCores());
    }

    @Test
    public void validateNegativeOiOiOdiamond() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        logicalPlan.addStream("OiOin", addOperator.output, threadIdValidatingGenericIntermediateOperator.input, threadIdValidatingGenericIntermediateOperator2.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout1", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("nonOiOout2", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input2).setLocality((DAG.Locality) null);
        try {
            logicalPlan.validate();
            Assert.fail("OIOIO negative diamond");
        } catch (ValidationException e) {
            Assert.assertTrue("OIOIO negative diamond", true);
        } catch (ConstraintViolationException e2) {
            Assert.assertTrue("OIOIO negative diamond", true);
        }
    }

    @Test
    public void validatePositiveOiOiOExtendeddiamond() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator3 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator4 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        logicalPlan.addStream("OiOin", addOperator.output, threadIdValidatingGenericIntermediateOperator.input, threadIdValidatingGenericIntermediateOperator3.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOIntermediate1", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericIntermediateOperator2.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOIntermediate2", threadIdValidatingGenericIntermediateOperator3.output, threadIdValidatingGenericIntermediateOperator4.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout1", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout2", threadIdValidatingGenericIntermediateOperator4.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input2).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.assertTrue("OiOiO extended diamond validation", true);
        } catch (ConstraintViolationException e) {
            Assert.fail("OIOIO extended diamond validation");
        }
    }

    @Test
    public void validateNegativeOiOiOExtendeddiamond() {
        logger.info("Checking the logic for sanity checking of OiO");
        LogicalPlan logicalPlan = new LogicalPlan();
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator3 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator4 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        logicalPlan.addStream("OiOin", addOperator.output, threadIdValidatingGenericIntermediateOperator.input, threadIdValidatingGenericIntermediateOperator3.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOIntermediate1", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericIntermediateOperator2.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("nonOiOIntermediate2", threadIdValidatingGenericIntermediateOperator3.output, threadIdValidatingGenericIntermediateOperator4.input).setLocality((DAG.Locality) null);
        logicalPlan.addStream("OiOout1", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.addStream("OiOout2", threadIdValidatingGenericIntermediateOperator4.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input2).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan.validate();
            Assert.fail("OiOiO extended diamond validation");
        } catch (ValidationException e) {
            Assert.assertTrue("OiOiO extended diamond validation", true);
        } catch (ConstraintViolationException e2) {
            Assert.assertTrue("OiOiO extended diamond validation", true);
        }
    }

    @Test
    public void validateOiOImplementation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream", logicalPlan.addOperator("Input Operator", new ThreadIdValidatingInputOperator()).output, ((ThreadIdValidatingOutputOperator) logicalPlan.addOperator("Output Operator", new ThreadIdValidatingOutputOperator())).input);
        ThreadIdValidatingOutputOperator.threadList.clear();
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertFalse("Thread Id", ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingOutputOperator.threadId);
        ThreadIdValidatingOutputOperator.threadList.clear();
        addStream.setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("Thread Id", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingOutputOperator.threadId);
    }

    @Test
    public void validateOiOiOImplementation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingOutputOperator threadIdValidatingOutputOperator = (ThreadIdValidatingOutputOperator) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingOutputOperator());
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("OiO1", addOperator.output, threadIdValidatingGenericIntermediateOperator.input);
        LogicalPlan.StreamMeta addStream2 = logicalPlan.addStream("OiO2", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingOutputOperator.input);
        ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertFalse("Thread Id 1", ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingGenericIntermediateOperator.threadId);
        Assert.assertFalse("Thread Id 2", ThreadIdValidatingGenericIntermediateOperator.threadId == ThreadIdValidatingOutputOperator.threadId);
        ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
        addStream.setLocality(DAG.Locality.THREAD_LOCAL);
        addStream2.setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("Thread Id 3", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperator.threadId);
        Assert.assertEquals("Thread Id 4", ThreadIdValidatingGenericIntermediateOperator.threadId, ThreadIdValidatingOutputOperator.threadId);
    }

    @Test
    public void validateOiOiODiamondImplementation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("OiOinput", addOperator.output, threadIdValidatingGenericIntermediateOperator.input, threadIdValidatingGenericIntermediateOperator2.input);
        LogicalPlan.StreamMeta addStream2 = logicalPlan.addStream("OiOintermediateToOutput1", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input);
        LogicalPlan.StreamMeta addStream3 = logicalPlan.addStream("OiOintermediateToOutput2", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input2);
        ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
        ThreadIdValidatingOutputOperator.threadList.clear();
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("nonOIO: Number of threads", 2L, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
        Assert.assertFalse("nonOIO: Thread Ids of input operator and intermediate operator1", ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingGenericIntermediateOperator.threadList.get(0).longValue());
        Assert.assertFalse("nonOIO: Thread Ids of input operator and intermediate operator2", ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingGenericIntermediateOperator.threadList.get(1).longValue());
        Assert.assertNotEquals("nonOIO: Thread Ids of two intermediate operators", ThreadIdValidatingGenericIntermediateOperator.threadList.get(0), ThreadIdValidatingGenericIntermediateOperator.threadList.get(1));
        Assert.assertNotEquals("nonOIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
        ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
        ThreadIdValidatingOutputOperator.threadList.clear();
        addStream.setLocality(DAG.Locality.THREAD_LOCAL);
        addStream2.setLocality(DAG.Locality.THREAD_LOCAL);
        addStream3.setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("OIO: Number of threads", 2L, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
        Assert.assertEquals("OIO: Thread Ids of input operator and intermediate operator1", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperator.threadList.get(0).longValue());
        Assert.assertEquals("OIO: Thread Ids of input operator and intermediate operator2", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperator.threadList.get(1).longValue());
        Assert.assertEquals("OIO: Thread Ids of two intermediate operators", ThreadIdValidatingGenericIntermediateOperator.threadList.get(0), ThreadIdValidatingGenericIntermediateOperator.threadList.get(1));
        Assert.assertEquals("OIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
    }

    @Test
    public void validateOiOiOTreeImplementation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator1", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperatorfromInputOper1", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator2 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperatorfromInterOper11", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingGenericIntermediateOperator threadIdValidatingGenericIntermediateOperator3 = (ThreadIdValidatingGenericIntermediateOperator) logicalPlan.addOperator("intermediateOperatorfromInterOper12", new ThreadIdValidatingGenericIntermediateOperator());
        ThreadIdValidatingOutputOperator threadIdValidatingOutputOperator = (ThreadIdValidatingOutputOperator) logicalPlan.addOperator("outputOperatorFromInputOper", new ThreadIdValidatingOutputOperator());
        ThreadIdValidatingOutputOperator threadIdValidatingOutputOperator2 = (ThreadIdValidatingOutputOperator) logicalPlan.addOperator("outputOperatorFromInterOper11", new ThreadIdValidatingOutputOperator());
        ThreadIdValidatingOutputOperator threadIdValidatingOutputOperator3 = (ThreadIdValidatingOutputOperator) logicalPlan.addOperator("outputOperatorFromInterOper21", new ThreadIdValidatingOutputOperator());
        ThreadIdValidatingOutputOperator threadIdValidatingOutputOperator4 = (ThreadIdValidatingOutputOperator) logicalPlan.addOperator("outputOperatorFromInterOper22", new ThreadIdValidatingOutputOperator());
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("OiO1", addOperator.output, threadIdValidatingOutputOperator.input, threadIdValidatingGenericIntermediateOperator.input);
        LogicalPlan.StreamMeta addStream2 = logicalPlan.addStream("OiO2", threadIdValidatingGenericIntermediateOperator.output, threadIdValidatingGenericIntermediateOperator2.input, threadIdValidatingGenericIntermediateOperator3.input);
        LogicalPlan.StreamMeta addStream3 = logicalPlan.addStream("OiO3", threadIdValidatingGenericIntermediateOperator2.output, threadIdValidatingOutputOperator2.input);
        logicalPlan.addStream("nonOiO1", threadIdValidatingGenericIntermediateOperator3.output, threadIdValidatingOutputOperator3.input, threadIdValidatingOutputOperator4.input);
        ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
        ThreadIdValidatingOutputOperator.threadList.clear();
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3L, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
        Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3L, new HashSet(ThreadIdValidatingGenericIntermediateOperator.threadList).size());
        Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4L, ThreadIdValidatingOutputOperator.threadList.size());
        Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4L, new HashSet(ThreadIdValidatingOutputOperator.threadList).size());
        Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(Long.valueOf(ThreadIdValidatingInputOperator.threadId)));
        Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(Long.valueOf(ThreadIdValidatingInputOperator.threadId)));
        ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
        ThreadIdValidatingOutputOperator.threadList.clear();
        addStream.setLocality(DAG.Locality.THREAD_LOCAL);
        addStream2.setLocality(DAG.Locality.THREAD_LOCAL);
        addStream3.setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3L, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
        Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1L, new HashSet(ThreadIdValidatingGenericIntermediateOperator.threadList).size());
        Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4L, ThreadIdValidatingOutputOperator.threadList.size());
        Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3L, new HashSet(ThreadIdValidatingOutputOperator.threadList).size());
        Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(Long.valueOf(ThreadIdValidatingInputOperator.threadId)));
        Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(Long.valueOf(ThreadIdValidatingInputOperator.threadId)));
    }

    @Test
    public void validateOiOTwoPortBetweenOperatorsImplementation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        ThreadIdValidatingInputOperator addOperator = logicalPlan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
        ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts threadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts = (ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts) logicalPlan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts());
        ThreadIdValidatingGenericOperatorWithTwoInputPorts threadIdValidatingGenericOperatorWithTwoInputPorts = (ThreadIdValidatingGenericOperatorWithTwoInputPorts) logicalPlan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
        logicalPlan.addStream("OiOinput", addOperator.output, threadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.input);
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("OiOintermediateOutput1", threadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.output, threadIdValidatingGenericOperatorWithTwoInputPorts.input);
        LogicalPlan.StreamMeta addStream2 = logicalPlan.addStream("OiOintermediateOutput2", threadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.output2, threadIdValidatingGenericOperatorWithTwoInputPorts.input2);
        ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.clear();
        ThreadIdValidatingOutputOperator.threadList.clear();
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("nonOIO: Number of threads", 1L, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.size());
        Assert.assertNotEquals("nonOIO: Thread Ids of input operator and intermediate operator", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId);
        Assert.assertNotEquals("nonOIO: Thread Ids of intermediate and output operators", ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
        Assert.assertNotEquals("nonOIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
        ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.clear();
        ThreadIdValidatingOutputOperator.threadList.clear();
        addStream.setLocality(DAG.Locality.THREAD_LOCAL);
        addStream2.setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.validate();
        new StramLocalCluster(logicalPlan).run();
        Assert.assertEquals("OIO: Number of threads", 1L, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.size());
        Assert.assertNotEquals("OIO: Thread Ids of input operator and intermediate operator", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId);
        Assert.assertEquals("OIO: Thread Ids of intermediate and output operators", ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
        Assert.assertNotEquals("OIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
    }
}
