package org.apache.tajo.worker;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.worker.TajoWorker;
import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
import org.apache.tajo.worker.event.NodeResourceEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/tajo/worker/TestNodeResourceManager.class */
public class TestNodeResourceManager {

    @Rule
    public TestName name = new TestName();
    private MockNodeResourceManager resourceManager;
    private NodeStatusUpdater statusUpdater;
    private TaskManager taskManager;
    private TaskExecutor taskExecutor;
    private AsyncDispatcher dispatcher;
    private AsyncDispatcher taskDispatcher;
    private TajoWorker.WorkerContext workerContext;
    private CompositeService service;
    private int taskMemory;
    private TajoConf conf;

    private static boolean enableEbCreateFailure(String str) {
        return str.equals("testResourceDeallocateWithEbCreateFailure");
    }

    @Before
    public void setup() {
        this.conf = new TajoConf();
        this.conf.setBoolVar(TajoConf.ConfVars.$TEST_MODE, true);
        this.taskMemory = 512;
        this.conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
        this.conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, this.taskMemory * this.conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
        this.conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
        this.conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
        this.conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
        this.dispatcher = new AsyncDispatcher();
        this.taskDispatcher = new AsyncDispatcher();
        this.workerContext = new MockWorkerContext() { // from class: org.apache.tajo.worker.TestNodeResourceManager.1
            WorkerConnectionInfo workerConnectionInfo;

            @Override // org.apache.tajo.worker.MockWorkerContext
            public TajoConf getConf() {
                return TestNodeResourceManager.this.conf;
            }

            public TaskManager getTaskManager() {
                return TestNodeResourceManager.this.taskManager;
            }

            public TaskExecutor getTaskExecuor() {
                return TestNodeResourceManager.this.taskExecutor;
            }

            public NodeResourceManager getNodeResourceManager() {
                return TestNodeResourceManager.this.resourceManager;
            }

            @Override // org.apache.tajo.worker.MockWorkerContext
            public WorkerConnectionInfo getConnectionInfo() {
                if (this.workerConnectionInfo == null) {
                    this.workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
                }
                return this.workerConnectionInfo;
            }
        };
        this.taskManager = new MockTaskManager(new Semaphore(0), this.taskDispatcher, this.workerContext);
        this.taskExecutor = new MockTaskExecutor(new Semaphore(0), this.workerContext);
        this.resourceManager = new MockNodeResourceManager(new Semaphore(0), this.dispatcher, this.workerContext);
        this.statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), this.workerContext);
        if (enableEbCreateFailure(this.name.getMethodName())) {
            ((MockTaskManager) this.taskManager).enableEbCreateFailure();
        }
        this.service = new CompositeService("MockService") { // from class: org.apache.tajo.worker.TestNodeResourceManager.2
            protected void serviceInit(Configuration configuration) throws Exception {
                addIfService(TestNodeResourceManager.this.dispatcher);
                addIfService(TestNodeResourceManager.this.taskDispatcher);
                addIfService(TestNodeResourceManager.this.taskManager);
                addIfService(TestNodeResourceManager.this.taskExecutor);
                addIfService(TestNodeResourceManager.this.resourceManager);
                addIfService(TestNodeResourceManager.this.statusUpdater);
                super.serviceInit(configuration);
            }

            protected void serviceStop() throws Exception {
                TestNodeResourceManager.this.workerContext.getMetrics().stop();
                super.serviceStop();
            }
        };
        this.service.init(this.conf);
        this.service.start();
    }

    @After
    public void tearDown() {
        this.service.stop();
    }

    @Test
    public void testNodeResourceAllocateEvent() throws Exception {
        this.resourceManager.setTaskHandlerEvent(false);
        CallFuture callFuture = new CallFuture();
        ResourceProtos.BatchAllocationRequest.Builder newBuilder = ResourceProtos.BatchAllocationRequest.newBuilder();
        ExecutionBlockId executionBlockId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
        newBuilder.setExecutionBlockId(executionBlockId.getProto());
        Assert.assertEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
        newBuilder.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(executionBlockId, this.taskMemory, 4));
        this.dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(newBuilder.build(), callFuture));
        ResourceProtos.BatchAllocationResponse batchAllocationResponse = (ResourceProtos.BatchAllocationResponse) callFuture.get();
        Assert.assertNotEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
        Assert.assertEquals(0L, batchAllocationResponse.getCancellationTaskCount());
    }

    @Test
    public void testNodeResourceCancellation() throws Exception {
        int intVar = this.conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
        this.resourceManager.setTaskHandlerEvent(false);
        CallFuture callFuture = new CallFuture();
        ResourceProtos.BatchAllocationRequest.Builder newBuilder = ResourceProtos.BatchAllocationRequest.newBuilder();
        ExecutionBlockId executionBlockId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
        newBuilder.setExecutionBlockId(executionBlockId.getProto());
        Assert.assertEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
        newBuilder.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(executionBlockId, this.taskMemory, intVar + 10));
        this.dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(newBuilder.build(), callFuture));
        Assert.assertEquals(10, ((ResourceProtos.BatchAllocationResponse) callFuture.get()).getCancellationTaskCount());
    }

    @Test
    public void testNodeResourceDeallocateEvent() throws Exception {
        this.resourceManager.setTaskHandlerEvent(false);
        CallFuture callFuture = new CallFuture();
        ResourceProtos.BatchAllocationRequest.Builder newBuilder = ResourceProtos.BatchAllocationRequest.newBuilder();
        ExecutionBlockId executionBlockId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
        newBuilder.setExecutionBlockId(executionBlockId.getProto());
        Assert.assertEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
        newBuilder.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(executionBlockId, this.taskMemory, 4));
        this.dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(newBuilder.build(), callFuture));
        ResourceProtos.BatchAllocationResponse batchAllocationResponse = (ResourceProtos.BatchAllocationResponse) callFuture.get();
        Assert.assertNotEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
        Assert.assertEquals(0L, batchAllocationResponse.getCancellationTaskCount());
        Iterator it = newBuilder.getTaskRequestList().iterator();
        while (it.hasNext()) {
            this.resourceManager.handle((NodeResourceEvent) new NodeResourceDeallocateEvent(((ResourceProtos.TaskAllocationProto) it.next()).getResource(), NodeResourceEvent.ResourceType.TASK));
        }
        Assert.assertEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
    }

    @Test(timeout = 30000)
    public void testParallelRequest() throws Exception {
        int intVar = this.conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
        this.resourceManager.setTaskHandlerEvent(true);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        final ExecutionBlockId executionBlockId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
        final Queue<ResourceProtos.TaskAllocationProto> createTaskRequests = MockNodeResourceManager.createTaskRequests(executionBlockId, this.taskMemory, 100000);
        ResourceProtos.TaskAllocationProto poll = createTaskRequests.poll();
        ResourceProtos.BatchAllocationRequest.Builder newBuilder = ResourceProtos.BatchAllocationRequest.newBuilder();
        newBuilder.addTaskRequest(poll);
        newBuilder.setExecutionBlockId(executionBlockId.getProto());
        CallFuture callFuture = new CallFuture();
        this.dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(newBuilder.build(), callFuture));
        Assert.assertTrue(((ResourceProtos.BatchAllocationResponse) callFuture.get()).getCancellationTaskCount() == 0);
        atomicInteger.incrementAndGet();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(intVar);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < intVar; i++) {
            newArrayList.add(newFixedThreadPool.submit(new Runnable() { // from class: org.apache.tajo.worker.TestNodeResourceManager.3
                @Override // java.lang.Runnable
                public void run() {
                    int i2 = 0;
                    while (true) {
                        ResourceProtos.TaskAllocationProto taskAllocationProto = (ResourceProtos.TaskAllocationProto) createTaskRequests.poll();
                        if (taskAllocationProto == null) {
                            atomicInteger.addAndGet(i2);
                            return;
                        }
                        ResourceProtos.BatchAllocationRequest.Builder newBuilder2 = ResourceProtos.BatchAllocationRequest.newBuilder();
                        newBuilder2.addTaskRequest(taskAllocationProto);
                        newBuilder2.setExecutionBlockId(executionBlockId.getProto());
                        CallFuture callFuture2 = new CallFuture();
                        TestNodeResourceManager.this.dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(newBuilder2.build(), callFuture2));
                        try {
                            ResourceProtos.BatchAllocationResponse batchAllocationResponse = (ResourceProtos.BatchAllocationResponse) callFuture2.get();
                            if (batchAllocationResponse.getCancellationTaskCount() > 0) {
                                createTaskRequests.addAll(batchAllocationResponse.getCancellationTaskList());
                                atomicInteger2.addAndGet(batchAllocationResponse.getCancellationTaskCount());
                            } else {
                                i2++;
                            }
                        } catch (Exception e) {
                            Assert.fail(e.getMessage());
                        }
                    }
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        newFixedThreadPool.shutdown();
        Assert.assertEquals(100000L, atomicInteger.get());
    }

    @Test
    public void testResourceDeallocateWithEbCreateFailure() throws Exception {
        this.resourceManager.setTaskHandlerEvent(true);
        ExecutionBlockId executionBlockId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
        ResourceProtos.TaskAllocationProto poll = MockNodeResourceManager.createTaskRequests(executionBlockId, this.taskMemory, 10).poll();
        ResourceProtos.BatchAllocationRequest.Builder newBuilder = ResourceProtos.BatchAllocationRequest.newBuilder();
        newBuilder.addTaskRequest(poll);
        newBuilder.setExecutionBlockId(executionBlockId.getProto());
        CallFuture callFuture = new CallFuture();
        this.dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(newBuilder.build(), callFuture));
        Assert.assertTrue(((ResourceProtos.BatchAllocationResponse) callFuture.get()).getCancellationTaskCount() == 0);
        Thread.sleep(2000L);
        Assert.assertEquals(this.resourceManager.getTotalResource(), this.resourceManager.getAvailableResource());
    }
}
