package com.datatorrent.stram;

import com.datatorrent.api.AffinityRule;
import com.datatorrent.api.AffinityRulesSet;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/AffinityRulesTest.class */
public class AffinityRulesTest {
    private static final Logger LOG = LoggerFactory.getLogger(AffinityRulesTest.class);

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    @Test
    public void testOperatorPartitionsAntiAffinity() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("O1", new TestGeneratorInputOperator());
        GenericTestOperator addOperator2 = logicalPlan.addOperator("O2", new GenericTestOperator());
        GenericTestOperator addOperator3 = logicalPlan.addOperator("O3", new GenericTestOperator());
        logicalPlan.addStream("stream1", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("stream2", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(5));
        AffinityRulesSet affinityRulesSet = new AffinityRulesSet();
        ArrayList arrayList = new ArrayList();
        affinityRulesSet.setAffinityRules(arrayList);
        arrayList.add(new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "O2", new String[]{"O2"}));
        logicalPlan.setAttribute(Context.DAGContext.AFFINITY_RULES_SET, affinityRulesSet);
        logicalPlan.validate();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, this.testMeta.getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            PTContainer pTContainer = ((StreamingContainerAgent.ContainerStartRequest) it.next()).container;
            if (((PTOperator) pTContainer.getOperators().get(0)).getName().equals("O2")) {
                Assert.assertEquals("Anti-affinity containers set should have 4 containers for other partitions ", 4L, pTContainer.getStrictAntiPrefs().size());
                Iterator it2 = pTContainer.getStrictAntiPrefs().iterator();
                while (it2.hasNext()) {
                    Iterator it3 = ((PTContainer) it2.next()).getOperators().iterator();
                    while (it3.hasNext()) {
                        Assert.assertEquals("Partion for O2 should be Anti Prefs", "O2", ((PTOperator) it3.next()).getName());
                    }
                }
            }
        }
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 10; i++) {
            NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host" + i, 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
            newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        }
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        HashSet hashSet = new HashSet();
        Iterator it4 = streamingContainerManager.containerStartRequests.iterator();
        while (it4.hasNext()) {
            StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) it4.next();
            String host = resourceRequestHandler.getHost(containerStartRequest, true);
            containerStartRequest.container.host = host;
            if (((PTOperator) containerStartRequest.container.getOperators().get(0)).getName().equals("O2")) {
                Assert.assertNotNull("Host name should not be null", host);
                LOG.info("Partition {} for operator O2 has host = {} ", Integer.valueOf(containerStartRequest.container.getId()), host);
                Assert.assertTrue("Each Partition should have a different host", !hashSet.contains(host));
                hashSet.add(host);
            }
        }
    }

    @Test
    public void testAntiAffinityInOperators() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, this.testMeta.getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("O1", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.MEMORY_MB, 256);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("O2", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.MEMORY_MB, 256);
        logicalPlan.getMeta(addOperator).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host1");
        AffinityRulesSet affinityRulesSet = new AffinityRulesSet();
        ArrayList arrayList = new ArrayList();
        affinityRulesSet.setAffinityRules(arrayList);
        arrayList.add(new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "O1", new String[]{"O2"}));
        logicalPlan.setAttribute(Context.DAGContext.AFFINITY_RULES_SET, affinityRulesSet);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host1", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        NodeReport newNodeReport2 = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host2", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport2.getNodeId().getHost(), newNodeReport2);
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) it.next();
            String host = resourceRequestHandler.getHost(containerStartRequest, true);
            containerStartRequest.container.host = host;
            if (((PTOperator) containerStartRequest.container.getOperators().get(0)).getName().equals("O1")) {
                Assert.assertEquals("Hosts set to host1 for Operator O1", "host1", host);
            }
            if (((PTOperator) containerStartRequest.container.getOperators().get(0)).getName().equals("O2")) {
                Assert.assertEquals("Hosts set to host2 for Operator O2", "host2", host);
            }
        }
    }
}
