package com.datatorrent.stram;

import com.datatorrent.api.AffinityRule;
import com.datatorrent.api.AffinityRulesSet;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PlanModifier;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.FutureTask;
import javax.validation.ValidationException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/LogicalPlanModificationTest.class */
public class LogicalPlanModificationTest {
    private LogicalPlan dag;

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    @Before
    public void setup() {
        this.dag = StramTestSupport.createDAG(this.testMeta);
    }

    @Test
    public void testAddOperator() {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        this.dag.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1);
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, testPlanContext);
        testPlanContext.deploy.clear();
        testPlanContext.undeploy.clear();
        Assert.assertEquals("containers", 3L, physicalPlan.getContainers().size());
        PlanModifier planModifier = new PlanModifier(physicalPlan);
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        planModifier.addOperator("added1", genericTestOperator);
        planModifier.addStream("added1.outport1", genericTestOperator.outport1, new Operator.InputPort[]{addOperator3.inport2});
        Assert.assertEquals("undeploy " + testPlanContext.undeploy, 0L, testPlanContext.undeploy.size());
        Assert.assertEquals("deploy " + testPlanContext.deploy, 0L, testPlanContext.deploy.size());
        planModifier.applyChanges(testPlanContext);
        Assert.assertEquals("containers post change", 4L, physicalPlan.getContainers().size());
        Assert.assertEquals("undeploy " + testPlanContext.undeploy, 1L, testPlanContext.undeploy.size());
        Assert.assertEquals("deploy " + testPlanContext.deploy, 2L, testPlanContext.deploy.size());
    }

    @Test
    public void testAddOperatorWithAffinityRules() {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        this.dag.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1);
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, testPlanContext);
        testPlanContext.deploy.clear();
        testPlanContext.undeploy.clear();
        Assert.assertEquals("containers", 3L, physicalPlan.getContainers().size());
        AffinityRulesSet affinityRulesSet = new AffinityRulesSet();
        ArrayList arrayList = new ArrayList();
        affinityRulesSet.setAffinityRules(arrayList);
        arrayList.add(new AffinityRule(AffinityRule.Type.AFFINITY, DAG.Locality.CONTAINER_LOCAL, false, "o1", new String[]{"added1"}));
        arrayList.add(new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "o3", new String[]{"added1"}));
        this.dag.setAttribute(Context.DAGContext.AFFINITY_RULES_SET, affinityRulesSet);
        PlanModifier planModifier = new PlanModifier(physicalPlan);
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        planModifier.addOperator("added1", genericTestOperator);
        planModifier.addStream("added1.outport1", genericTestOperator.outport1, new Operator.InputPort[]{addOperator3.inport2});
        Assert.assertEquals("undeploy " + testPlanContext.undeploy, 0L, testPlanContext.undeploy.size());
        Assert.assertEquals("deploy " + testPlanContext.deploy, 0L, testPlanContext.deploy.size());
        planModifier.applyChanges(testPlanContext);
        Assert.assertEquals("containers post change", 4L, physicalPlan.getContainers().size());
        Assert.assertEquals("undeploy " + testPlanContext.undeploy, 1L, testPlanContext.undeploy.size());
        Assert.assertEquals("deploy " + testPlanContext.deploy, 2L, testPlanContext.deploy.size());
        for (PTContainer pTContainer : physicalPlan.getContainers()) {
            if (pTContainer.getOperators().contains("added1")) {
                Assert.assertEquals("Operators O1 and added1 should be in the same container as per affinity rule", 2L, pTContainer.getOperators().size());
                Assert.assertEquals("Operators O1 and added1 should be in the same container as per affinity rule", "o1", ((PTOperator) pTContainer.getOperators().get(0)).getOperatorMeta().getName());
                Assert.assertEquals("Operators O1 and added1 should be in the same container as per affinity rule", "added1", ((PTOperator) pTContainer.getOperators().get(1)).getOperatorMeta().getName());
                Set strictAntiPrefs = pTContainer.getStrictAntiPrefs();
                Assert.assertEquals("There should be one container in antiaffinity list", 1L, strictAntiPrefs.size());
                Assert.assertEquals("AntiAffinity operators should containn operator O3", ((PTOperator) ((PTContainer) strictAntiPrefs.iterator().next()).getOperators().iterator().next()).getOperatorMeta().getName(), "o3");
            }
        }
    }

    @Test
    public void testSetOperatorProperty() {
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(this.dag.addOperator("o1", GenericTestOperator.class));
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, testPlanContext);
        testPlanContext.deploy.clear();
        testPlanContext.undeploy.clear();
        PlanModifier planModifier = new PlanModifier(physicalPlan);
        try {
            planModifier.setOperatorProperty(meta.getName(), "myStringProperty", "propertyValue");
            Assert.fail("validation error exepected");
        } catch (ValidationException e) {
            Assert.assertTrue(e.getMessage().contains(meta.toString()));
        }
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        planModifier.addOperator("newOperator", genericTestOperator);
        planModifier.setOperatorProperty("newOperator", "myStringProperty", "propertyValue");
        Assert.assertEquals("", "propertyValue", genericTestOperator.getMyStringProperty());
    }

    @Test
    public void testRemoveOperator() {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        GenericTestOperator addOperator2 = this.dag.addOperator("o12", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        GenericTestOperator addOperator3 = this.dag.addOperator("o2", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        GenericTestOperator addOperator4 = this.dag.addOperator("o3", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta4 = this.dag.getMeta(addOperator4);
        LogicalPlan.StreamMeta addStream = this.dag.addStream("o1.outport1", addOperator.outport1, addOperator3.inport1, addOperator2.inport1);
        LogicalPlan.StreamMeta addStream2 = this.dag.addStream("o2.outport1", addOperator3.outport1, addOperator4.inport1);
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, testPlanContext);
        testPlanContext.deploy.clear();
        testPlanContext.undeploy.clear();
        Assert.assertEquals("containers " + physicalPlan.getContainers(), 4L, physicalPlan.getContainers().size());
        Assert.assertEquals("physical operators " + physicalPlan.getAllOperators(), 4L, physicalPlan.getAllOperators().size());
        Assert.assertEquals("sinks s1 " + addStream.getSinks(), 2L, addStream.getSinks().size());
        List operators = physicalPlan.getOperators(meta3);
        Assert.assertEquals("instances " + meta3, 1L, operators.size());
        PlanModifier planModifier = new PlanModifier(physicalPlan);
        try {
            planModifier.removeOperator(meta3.getName());
            Assert.fail("validation error (connected output stream) expected");
        } catch (ValidationException e) {
        }
        planModifier.removeStream(addStream2.getName());
        planModifier.removeOperator(meta3.getName());
        planModifier.applyChanges(testPlanContext);
        Assert.assertEquals("sinks s1 " + addStream.getSinks(), 1L, addStream.getSinks().size());
        Assert.assertTrue("undeploy " + testPlanContext.undeploy, testPlanContext.undeploy.containsAll(operators));
        Assert.assertTrue("deploy " + testPlanContext.deploy, !testPlanContext.deploy.containsAll(operators));
        Assert.assertEquals("streams " + this.dag.getAllStreams(), 1L, this.dag.getAllStreams().size());
        Assert.assertEquals("operators " + this.dag.getAllOperators(), 3L, this.dag.getAllOperators().size());
        Assert.assertTrue("operators " + this.dag.getAllOperators(), this.dag.getAllOperators().containsAll(Sets.newHashSet(new LogicalPlan.OperatorMeta[]{meta, meta4})));
        try {
            physicalPlan.getOperators(meta3);
            Assert.fail("removed from physical plan: " + meta3);
        } catch (Exception e2) {
        }
        Assert.assertEquals("containers " + physicalPlan.getContainers(), 3L, physicalPlan.getContainers().size());
        Assert.assertEquals("physical operators " + physicalPlan.getAllOperators(), 3L, physicalPlan.getAllOperators().size());
        Assert.assertEquals("removed containers " + testPlanContext.releaseContainers, 1L, testPlanContext.releaseContainers.size());
        try {
            planModifier.removeOperator(meta2.getName());
            Assert.fail("cannot remove operator prior to removing input stream");
        } catch (ValidationException e3) {
            Assert.assertTrue("" + e3.getMessage(), e3.getMessage().matches(".*Operator o12 connected to input streams.*"));
        }
        planModifier.removeStream(addStream.getName());
        planModifier.removeOperator(meta2.getName());
    }

    @Test
    public void testRemoveOperator2() {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta = this.dag.getMeta(addOperator);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta2 = this.dag.getMeta(addOperator2);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        LogicalPlan.OperatorMeta meta3 = this.dag.getMeta(addOperator3);
        LogicalPlan.StreamMeta locality = this.dag.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1, addOperator3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, testPlanContext);
        testPlanContext.deploy.clear();
        testPlanContext.undeploy.clear();
        Assert.assertEquals("containers " + physicalPlan.getContainers(), 1L, physicalPlan.getContainers().size());
        Assert.assertEquals("physical operators " + physicalPlan.getAllOperators(), 3L, physicalPlan.getAllOperators().size());
        Assert.assertEquals("sinks s1 " + locality.getSinks(), 2L, locality.getSinks().size());
        List operators = physicalPlan.getOperators(meta2);
        Assert.assertEquals("instances " + meta2, 1L, operators.size());
        PlanModifier planModifier = new PlanModifier(physicalPlan);
        planModifier.removeOperator(meta2.getName());
        planModifier.applyChanges(testPlanContext);
        Assert.assertEquals("sinks s1 " + locality.getSinks(), 1L, locality.getSinks().size());
        Assert.assertTrue("undeploy " + testPlanContext.undeploy, testPlanContext.undeploy.containsAll(operators));
        Assert.assertEquals("deploy " + testPlanContext.deploy, testPlanContext.deploy, Sets.newHashSet());
        Assert.assertEquals("streams " + this.dag.getAllStreams(), 1L, this.dag.getAllStreams().size());
        Assert.assertEquals("operators " + this.dag.getAllOperators(), 2L, this.dag.getAllOperators().size());
        Assert.assertTrue("operators " + this.dag.getAllOperators(), this.dag.getAllOperators().containsAll(Sets.newHashSet(new LogicalPlan.OperatorMeta[]{meta, meta3})));
    }

    @Test
    public void testRemoveStream() {
        this.dag.addStream("o1.outport1", this.dag.addOperator("o1", GenericTestOperator.class).outport1, this.dag.addOperator("o2", GenericTestOperator.class).inport1);
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PlanModifier planModifier = new PlanModifier(new PhysicalPlan(this.dag, testPlanContext));
        planModifier.removeStream("o1.outport1");
        planModifier.applyChanges(testPlanContext);
        Assert.assertEquals("undeploy " + testPlanContext.undeploy, 2L, testPlanContext.undeploy.size());
        Assert.assertEquals("deploy " + testPlanContext.deploy, 2L, testPlanContext.deploy.size());
    }

    @Test
    public void testAddStream() {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        TestPlanContext testPlanContext = new TestPlanContext();
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, testPlanContext);
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, testPlanContext);
        List operators = physicalPlan.getOperators(this.dag.getMeta(addOperator));
        Assert.assertEquals("o1Instances " + operators, 1L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        List operators2 = physicalPlan.getOperators(this.dag.getMeta(addOperator2));
        Assert.assertEquals("o2Instances " + operators2, 1L, operators2.size());
        PTOperator pTOperator2 = (PTOperator) operators2.get(0);
        Assert.assertEquals("outputs " + pTOperator, 0L, pTOperator.getOutputs().size());
        Assert.assertEquals("inputs " + pTOperator2, 0L, pTOperator2.getInputs().size());
        PlanModifier planModifier = new PlanModifier(physicalPlan);
        planModifier.addStream("o1.outport1", addOperator.outport1, new Operator.InputPort[]{addOperator2.inport1});
        planModifier.addStream("o1.outport1", addOperator.outport1, new Operator.InputPort[]{addOperator2.inport2});
        planModifier.applyChanges(testPlanContext);
        Assert.assertEquals("undeploy " + testPlanContext.undeploy, 2L, testPlanContext.undeploy.size());
        Assert.assertEquals("deploy " + testPlanContext.deploy, 2L, testPlanContext.deploy.size());
        Assert.assertEquals("outputs " + pTOperator, 1L, pTOperator.getOutputs().size());
        Assert.assertEquals("inputs " + pTOperator2, 2L, pTOperator2.getInputs().size());
        HashSet newHashSet = Sets.newHashSet();
        Iterator it = pTOperator2.getInputs().iterator();
        while (it.hasNext()) {
            newHashSet.add(((PTOperator.PTInput) it.next()).portName);
        }
        Assert.assertEquals("input port names " + pTOperator2.getInputs(), Sets.newHashSet(new String[]{GenericTestOperator.IPORT1, GenericTestOperator.IPORT2}), newHashSet);
    }

    private void testExecutionManager(StorageAgent storageAgent) throws Exception {
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        Assert.assertEquals("" + streamingContainerManager.containerStartRequests, streamingContainerManager.containerStartRequests.size(), 0L);
        CreateOperatorRequest createOperatorRequest = new CreateOperatorRequest();
        createOperatorRequest.setOperatorFQCN(TestGeneratorInputOperator.class.getName());
        createOperatorRequest.setOperatorName("o1");
        FutureTask logicalPlanModification = streamingContainerManager.logicalPlanModification(Collections.singletonList(createOperatorRequest));
        while (!logicalPlanModification.isDone()) {
            streamingContainerManager.processEvents();
        }
        logicalPlanModification.get();
        Assert.assertEquals("" + streamingContainerManager.containerStartRequests, 1L, streamingContainerManager.containerStartRequests.size());
        PTContainer pTContainer = ((StreamingContainerAgent.ContainerStartRequest) streamingContainerManager.containerStartRequests.poll()).container;
        Assert.assertEquals("operators " + pTContainer, 1L, pTContainer.getOperators().size());
        int i = 0;
        Iterator it = pTContainer.getOperators().iterator();
        while (it.hasNext()) {
            if (((PTOperator) it.next()).getState() == PTOperator.State.PENDING_DEPLOY) {
                i++;
            }
        }
        Assert.assertEquals("deploy requests " + pTContainer, 1L, i);
        PTOperator pTOperator = (PTOperator) pTContainer.getOperators().get(0);
        Assert.assertEquals("operator name", "o1", pTOperator.getOperatorMeta().getName());
        Assert.assertEquals("operator class", TestGeneratorInputOperator.class, pTOperator.getOperatorMeta().getOperator().getClass());
    }

    @Test
    public void testExecutionManagerWithSyncStorageAgent() throws Exception {
        testExecutionManager(new FSStorageAgent(this.testMeta.getPath(), (Configuration) null));
    }

    @Test
    public void testExecutionManagerWithAsyncStorageAgent() throws Exception {
        testExecutionManager(new AsyncFSStorageAgent(this.testMeta.getPath(), (Configuration) null));
    }
}
