package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Partitioner;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/HostLocalTest.class */
public class HostLocalTest {

    /* loaded from: input_file:com/datatorrent/stram/HostLocalTest$LocalityPartitioner.class */
    private static class LocalityPartitioner extends StatelessPartitioner<GenericTestOperator> {
        private static final long serialVersionUID = 1;

        private LocalityPartitioner() {
        }

        public Collection<Partitioner.Partition<GenericTestOperator>> definePartitions(Collection<Partitioner.Partition<GenericTestOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
            Collection<Partitioner.Partition<GenericTestOperator>> definePartitions = super.definePartitions(collection, partitioningContext);
            Iterator<Partitioner.Partition<GenericTestOperator>> it = definePartitions.iterator();
            for (int i = 0; i < definePartitions.size() && it.hasNext(); i++) {
                it.next().getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host" + (i + 1));
            }
            return definePartitions;
        }
    }

    @Test
    public void testPartitionLocality() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        LocalityPartitioner localityPartitioner = new LocalityPartitioner();
        localityPartitioner.setPartitionCount(3);
        logicalPlan.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.PARTITIONER, localityPartitioner);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 3; i++) {
            NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host" + (i + 1), 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
            newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        }
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 3; i2++) {
            newHashSet.add("host" + (i2 + 1));
        }
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            String host = resourceRequestHandler.getHost((StreamingContainerAgent.ContainerStartRequest) it.next(), true);
            if (host != null) {
                newHashSet.remove(host);
            }
        }
        Assert.assertTrue("All the allocated hosts removed", newHashSet.isEmpty());
    }

    @Test
    public void testNodeLocal() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.MEMORY_MB, 256);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.MEMORY_MB, 256);
        logicalPlan.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host1");
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1).setLocality(DAG.Locality.NODE_LOCAL);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host1", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        NodeReport newNodeReport2 = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host2", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport2.getNodeId().getHost(), newNodeReport2);
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) it.next();
            String host = resourceRequestHandler.getHost(containerStartRequest, true);
            containerStartRequest.container.host = host;
            Assert.assertEquals("Hosts set to host1", "host1", host);
        }
    }

    @Test
    public void testThreadLocal() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.getMeta(addOperator).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host2");
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.MEMORY_MB, 256);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.MEMORY_MB, 256);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host1", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        NodeReport newNodeReport2 = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host2", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport2.getNodeId().getHost(), newNodeReport2);
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        Assert.assertEquals("number of containers is 1", 1L, streamingContainerManager.containerStartRequests.size());
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) it.next();
            String host = resourceRequestHandler.getHost(containerStartRequest, true);
            containerStartRequest.container.host = host;
            Assert.assertEquals("Hosts set to host2", "host2", host);
        }
    }

    @Test
    public void testContainerLocal() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.getMeta(addOperator).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host2");
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.MEMORY_MB, 256);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.MEMORY_MB, 256);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host1", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        NodeReport newNodeReport2 = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host2", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport2.getNodeId().getHost(), newNodeReport2);
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        Assert.assertEquals("number of containers is 1", 1L, streamingContainerManager.containerStartRequests.size());
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) it.next();
            String host = resourceRequestHandler.getHost(containerStartRequest, true);
            containerStartRequest.container.host = host;
            Assert.assertEquals("Hosts set to host2", "host2", host);
        }
    }

    @Test
    public void testContainerLocalWithVCores() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.getMeta(addOperator).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host2");
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.MEMORY_MB, 256);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.VCORES, 1);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.VCORES, 1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host1", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        NodeReport newNodeReport2 = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host2", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport2.getNodeId().getHost(), newNodeReport2);
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        Assert.assertEquals("number of containers is 1", 1L, streamingContainerManager.containerStartRequests.size());
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) it.next();
            String host = resourceRequestHandler.getHost(containerStartRequest, true);
            containerStartRequest.container.host = host;
            Assert.assertEquals("number of vcores", 2L, containerStartRequest.container.getRequiredVCores());
            Assert.assertEquals("Hosts set to host2", "host2", host);
        }
    }

    @Test
    public void testUnavailableResources() {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        logicalPlan.getMeta(addOperator).getAttributes().put(Context.OperatorContext.LOCALITY_HOST, "host2");
        GenericTestOperator addOperator2 = logicalPlan.addOperator("partitioned", GenericTestOperator.class);
        logicalPlan.addStream("o1_outport1", addOperator.outport1, addOperator2.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.MEMORY_MB, 256);
        logicalPlan.setAttribute(addOperator, Context.OperatorContext.VCORES, 2);
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.VCORES, 1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        HashMap newHashMap = Maps.newHashMap();
        NodeReport newNodeReport = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host1", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport.getNodeId().getHost(), newNodeReport);
        NodeReport newNodeReport2 = BuilderUtils.newNodeReport(BuilderUtils.newNodeId("host2", 0), NodeState.RUNNING, "httpAddress", "rackName", BuilderUtils.newResource(0, 0), BuilderUtils.newResource(1000 * 2, 2), 0, (String) null, 0L);
        newHashMap.put(newNodeReport2.getNodeId().getHost(), newNodeReport2);
        resourceRequestHandler.updateNodeReports(Lists.newArrayList(newHashMap.values()));
        Assert.assertEquals("number of containers is 1", 1L, streamingContainerManager.containerStartRequests.size());
        Iterator it = streamingContainerManager.containerStartRequests.iterator();
        while (it.hasNext()) {
            String host = resourceRequestHandler.getHost((StreamingContainerAgent.ContainerStartRequest) it.next(), true);
            Assert.assertEquals("number of vcores", 3L, r0.container.getRequiredVCores());
            Assert.assertNull("Host is null", host);
        }
    }
}
