package org.apache.tajo.master.scheduler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.annotation.NotThreadSafe;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.rm.NodeEvent;
import org.apache.tajo.master.rm.NodeEventType;
import org.apache.tajo.master.rm.NodeStatus;
import org.apache.tajo.master.rm.TajoRMContext;
import org.apache.tajo.master.rm.TajoResourceManager;
import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent;
import org.apache.tajo.master.scheduler.event.SchedulerEvent;
import org.apache.tajo.master.scheduler.event.SchedulerEventType;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.resource.NodeResources;
import org.apache.tajo.rpc.CallFuture;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

@NotThreadSafe
/* loaded from: input_file:org/apache/tajo/master/scheduler/TestSimpleScheduler.class */
public class TestSimpleScheduler {
    private CompositeService service;
    private SimpleScheduler scheduler;
    private TajoRMContext rmContext;
    private AsyncDispatcher dispatcher;
    private TajoConf conf;
    private NodeResource nodeResource;
    private NodeResource totalResource;
    private Semaphore barrier;
    private static ScheduledExecutorService executorService;
    private int workerNum = 3;
    private int testDelay = 50;

    /* loaded from: input_file:org/apache/tajo/master/scheduler/TestSimpleScheduler$MySimpleScheduler.class */
    class MySimpleScheduler extends SimpleScheduler {
        Semaphore barrier;
        Map<QueryId, QueryInfo> queryInfoMap;
        Map<QueryId, ResourceProtos.AllocationResourceProto> qmAllocationMap;

        public MySimpleScheduler(TajoRMContext tajoRMContext, Semaphore semaphore) {
            super((TajoMaster.MasterContext) null, tajoRMContext);
            this.queryInfoMap = Maps.newHashMap();
            this.qmAllocationMap = Maps.newHashMap();
            this.barrier = semaphore;
        }

        public void submitQuery(QuerySchedulingInfo querySchedulingInfo) {
            this.queryInfoMap.put(querySchedulingInfo.getQueryId(), new QueryInfo(querySchedulingInfo.getQueryId()) { // from class: org.apache.tajo.master.scheduler.TestSimpleScheduler.MySimpleScheduler.1
                QueryContext context;

                public QueryContext getQueryContext() {
                    if (this.context == null) {
                        this.context = new QueryContext(TestSimpleScheduler.this.conf);
                        this.context.setUser("user");
                    }
                    return this.context;
                }
            });
            super.submitQuery(querySchedulingInfo);
        }

        protected boolean startQuery(final QueryId queryId, final ResourceProtos.AllocationResourceProto allocationResourceProto) {
            TestSimpleScheduler.executorService.schedule(new Runnable() { // from class: org.apache.tajo.master.scheduler.TestSimpleScheduler.MySimpleScheduler.2
                @Override // java.lang.Runnable
                public void run() {
                    MySimpleScheduler.this.barrier.release();
                    MySimpleScheduler.this.qmAllocationMap.put(queryId, allocationResourceProto);
                    TestSimpleScheduler.this.rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
                }
            }, TestSimpleScheduler.this.testDelay, TimeUnit.MILLISECONDS);
            return true;
        }

        public void handle(SchedulerEvent schedulerEvent) {
            super.handle(schedulerEvent);
            this.barrier.release();
        }

        protected QueryInfo getQueryInfo(QueryId queryId) {
            return this.queryInfoMap.get(queryId);
        }

        public void stopQuery(QueryId queryId) {
            this.queryInfoMap.remove(queryId);
            ResourceProtos.AllocationResourceProto remove = this.qmAllocationMap.remove(queryId);
            NodeResources.addTo(((NodeStatus) TestSimpleScheduler.this.rmContext.getNodes().get(Integer.valueOf(remove.getWorkerId()))).getAvailableResource(), new NodeResource(remove.getResource()));
            super.stopQuery(queryId);
        }
    }

    @BeforeClass
    public static void setupClass() {
        executorService = Executors.newScheduledThreadPool(10);
    }

    @AfterClass
    public static void tearDownClass() {
        executorService.shutdown();
    }

    @Before
    public void setup() {
        this.conf = new TajoConf();
        this.nodeResource = NodeResource.createResource(1500, 2, 3);
        this.service = new CompositeService(TestSimpleScheduler.class.getSimpleName()) { // from class: org.apache.tajo.master.scheduler.TestSimpleScheduler.1
            protected void serviceInit(Configuration configuration) throws Exception {
                TestSimpleScheduler.this.dispatcher = new AsyncDispatcher();
                addService(TestSimpleScheduler.this.dispatcher);
                TestSimpleScheduler.this.rmContext = new TajoRMContext(TestSimpleScheduler.this.dispatcher);
                TestSimpleScheduler.this.rmContext.getDispatcher().register(NodeEventType.class, new TajoResourceManager.WorkerEventDispatcher(TestSimpleScheduler.this.rmContext));
                TestSimpleScheduler.this.barrier = new Semaphore(0);
                TestSimpleScheduler.this.scheduler = new MySimpleScheduler(TestSimpleScheduler.this.rmContext, TestSimpleScheduler.this.barrier);
                addService(TestSimpleScheduler.this.scheduler);
                TestSimpleScheduler.this.rmContext.getDispatcher().register(SchedulerEventType.class, TestSimpleScheduler.this.scheduler);
                for (int i = 0; i < TestSimpleScheduler.this.workerNum; i++) {
                    WorkerConnectionInfo workerConnectionInfo = new WorkerConnectionInfo("host" + i, 28091 + i, 28092, 21000, 28093, 28080);
                    TestSimpleScheduler.this.rmContext.getNodes().putIfAbsent(Integer.valueOf(workerConnectionInfo.getId()), new NodeStatus(TestSimpleScheduler.this.rmContext, NodeResources.clone(TestSimpleScheduler.this.nodeResource), workerConnectionInfo));
                    TestSimpleScheduler.this.rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(workerConnectionInfo.getId(), NodeEventType.STARTED));
                }
                super.serviceInit(configuration);
            }
        };
        this.service.init(this.conf);
        this.service.start();
        Assert.assertEquals(this.workerNum, this.rmContext.getNodes().size());
        this.totalResource = NodeResources.createResource(0);
        Iterator it = this.rmContext.getNodes().values().iterator();
        while (it.hasNext()) {
            NodeResources.addTo(this.totalResource, ((NodeStatus) it.next()).getTotalResourceCapability());
        }
    }

    @After
    public void tearDown() {
        this.service.stop();
    }

    @Test
    public void testInitialCapacity() throws InterruptedException {
        Assert.assertEquals(this.workerNum, this.scheduler.getNumClusterNodes());
        Assert.assertEquals(0L, this.scheduler.getRunningQuery());
        Assert.assertEquals(this.totalResource, this.scheduler.getMaximumResourceCapability());
        Assert.assertEquals(this.totalResource, this.scheduler.getClusterResource());
        Assert.assertEquals(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.defaultIntVal, this.scheduler.getQMMinimumResourceCapability().getMemory());
        Assert.assertEquals(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.defaultIntVal, this.scheduler.getMinimumResourceCapability().getMemory());
    }

    @Test(timeout = 10000)
    public void testSubmitOneQuery() throws InterruptedException {
        QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo("default", "user", QueryIdFactory.newQueryId(System.nanoTime(), 0), 1, System.currentTimeMillis());
        Assert.assertEquals(0L, this.scheduler.getRunningQuery());
        this.scheduler.submitQuery(querySchedulingInfo);
        this.barrier.acquire();
        Assert.assertEquals(1L, this.scheduler.getRunningQuery());
        Assert.assertEquals(this.totalResource, this.scheduler.getMaximumResourceCapability());
        Assert.assertEquals(this.totalResource, NodeResources.add(this.scheduler.getQMMinimumResourceCapability(), this.scheduler.getClusterResource()));
    }

    @Test(timeout = 10000)
    public void testMaximumSubmitQuery() throws InterruptedException {
        Assert.assertEquals(0L, this.scheduler.getRunningQuery());
        int computeAvailableContainers = this.scheduler.getResourceCalculator().computeAvailableContainers(this.scheduler.getMaximumResourceCapability(), this.scheduler.getQMMinimumResourceCapability());
        for (int i = 0; i < 10; i++) {
            this.scheduler.submitQuery(new QuerySchedulingInfo("default", "user", QueryIdFactory.newQueryId(System.nanoTime(), 0), 1, System.currentTimeMillis()));
        }
        this.barrier.acquire();
        Assert.assertEquals(Math.floor(computeAvailableContainers * 0.5f), this.scheduler.getRunningQuery(), 1.0d);
        Assert.assertEquals(10, this.scheduler.getRunningQuery() + this.scheduler.getQueryQueue().size());
    }

    @Test(timeout = 10000)
    public void testReserveResource() throws InterruptedException, ExecutionException {
        Assert.assertEquals(this.totalResource, this.scheduler.getMaximumResourceCapability());
        Assert.assertEquals(this.totalResource, this.scheduler.getClusterResource());
        QueryId newQueryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
        CallFuture callFuture = new CallFuture();
        this.rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent(createResourceRequest(newQueryId, 3, new ArrayList()), callFuture));
        ResourceProtos.NodeResourceResponse nodeResourceResponse = (ResourceProtos.NodeResourceResponse) callFuture.get();
        Assert.assertEquals(newQueryId, new QueryId(nodeResourceResponse.getQueryId()));
        Assert.assertEquals(3, nodeResourceResponse.getResourceCount());
        NodeResource createResource = NodeResources.createResource(0);
        Iterator it = nodeResourceResponse.getResourceList().iterator();
        while (it.hasNext()) {
            NodeResources.addTo(createResource, new NodeResource(((ResourceProtos.AllocationResourceProto) it.next()).getResource()));
        }
        Assert.assertEquals(NodeResources.subtract(this.totalResource, createResource), this.scheduler.getClusterResource());
    }

    @Test(timeout = 10000)
    public void testReserveResourceWithWorkerPriority() throws InterruptedException, ExecutionException {
        Assert.assertEquals(this.totalResource, this.scheduler.getMaximumResourceCapability());
        Assert.assertEquals(this.totalResource, this.scheduler.getClusterResource());
        ArrayList newArrayList = Lists.newArrayList();
        Map.Entry entry = (Map.Entry) this.rmContext.getNodes().entrySet().iterator().next();
        newArrayList.add(entry.getKey());
        Assert.assertTrue(NodeResources.fitsIn(NodeResources.multiply(this.scheduler.getMinimumResourceCapability(), 2), ((NodeStatus) entry.getValue()).getAvailableResource()));
        QueryId newQueryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
        ResourceProtos.NodeResourceRequest createResourceRequest = createResourceRequest(newQueryId, 2, newArrayList);
        CallFuture callFuture = new CallFuture();
        this.rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent(createResourceRequest, callFuture));
        ResourceProtos.NodeResourceResponse nodeResourceResponse = (ResourceProtos.NodeResourceResponse) callFuture.get();
        Assert.assertEquals(newQueryId, new QueryId(nodeResourceResponse.getQueryId()));
        Assert.assertEquals(2, nodeResourceResponse.getResourceCount());
        Iterator it = nodeResourceResponse.getResourceList().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((Integer) entry.getKey()).intValue(), ((ResourceProtos.AllocationResourceProto) it.next()).getWorkerId());
        }
    }

    private ResourceProtos.NodeResourceRequest createResourceRequest(QueryId queryId, int i, List<Integer> list) {
        ResourceProtos.NodeResourceRequest.Builder newBuilder = ResourceProtos.NodeResourceRequest.newBuilder();
        newBuilder.setCapacity(this.scheduler.getMinimumResourceCapability().getProto()).setNumContainers(i).setPriority(1).setQueryId(queryId.getProto()).setType(ResourceProtos.ResourceType.LEAF).setUserId("test user").setRunningTasks(0).addAllCandidateNodes(list).setQueue("default");
        return newBuilder.build();
    }
}
