package org.apache.tajo.worker;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.worker.MockNodeStatusUpdater;
import org.apache.tajo.worker.TajoWorker;
import org.apache.tajo.worker.event.NodeStatusEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/tajo/worker/TestNodeStatusUpdater.class */
public class TestNodeStatusUpdater {
    private NodeResourceManager resourceManager;
    private MockNodeStatusUpdater statusUpdater;
    private MockTaskManager taskManager;
    private AsyncDispatcher dispatcher;
    private AsyncDispatcher taskDispatcher;
    private CompositeService service;
    private TajoConf conf;
    private TajoWorker.WorkerContext workerContext;

    @Before
    public void setup() {
        this.conf = new TajoConf();
        this.conf.setBoolVar(TajoConf.ConfVars.$TEST_MODE, true);
        this.conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2);
        this.conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
        this.workerContext = new MockWorkerContext() { // from class: org.apache.tajo.worker.TestNodeStatusUpdater.1
            WorkerConnectionInfo workerConnectionInfo;

            @Override // org.apache.tajo.worker.MockWorkerContext
            public TajoConf getConf() {
                return TestNodeStatusUpdater.this.conf;
            }

            public TaskManager getTaskManager() {
                return TestNodeStatusUpdater.this.taskManager;
            }

            public TaskExecutor getTaskExecuor() {
                return null;
            }

            public NodeResourceManager getNodeResourceManager() {
                return TestNodeStatusUpdater.this.resourceManager;
            }

            @Override // org.apache.tajo.worker.MockWorkerContext
            public WorkerConnectionInfo getConnectionInfo() {
                if (this.workerConnectionInfo == null) {
                    this.workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
                }
                return this.workerConnectionInfo;
            }
        };
        this.conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL, 1000);
        this.dispatcher = new AsyncDispatcher();
        this.resourceManager = new NodeResourceManager(this.dispatcher, this.workerContext);
        this.taskDispatcher = new AsyncDispatcher();
        this.taskManager = new MockTaskManager(new Semaphore(0), this.taskDispatcher, this.workerContext) { // from class: org.apache.tajo.worker.TestNodeStatusUpdater.2
            public int getRunningTasks() {
                return 0;
            }
        };
        this.service = new CompositeService("MockService") { // from class: org.apache.tajo.worker.TestNodeStatusUpdater.3
            protected void serviceInit(Configuration configuration) throws Exception {
                addIfService(TestNodeStatusUpdater.this.dispatcher);
                addIfService(TestNodeStatusUpdater.this.taskDispatcher);
                addIfService(TestNodeStatusUpdater.this.taskManager);
                addIfService(TestNodeStatusUpdater.this.resourceManager);
                addIfService(TestNodeStatusUpdater.this.statusUpdater);
                super.serviceInit(configuration);
            }

            protected void serviceStop() throws Exception {
                TestNodeStatusUpdater.this.workerContext.getMetrics().stop();
                super.serviceStop();
            }
        };
        this.service.init(this.conf);
        this.service.start();
    }

    @After
    public void tearDown() {
        this.service.stop();
    }

    @Test(timeout = 20000)
    public void testNodeMembership() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.statusUpdater = new MockNodeStatusUpdater(countDownLatch, this.workerContext);
        this.statusUpdater.init(this.conf);
        this.statusUpdater.start();
        MockNodeStatusUpdater.MockResourceTracker resourceTracker = this.statusUpdater.getResourceTracker();
        countDownLatch.await();
        Assert.assertTrue(resourceTracker.getTotalResource().containsKey(Integer.valueOf(this.workerContext.getConnectionInfo().getId())));
        Assert.assertEquals(this.resourceManager.getTotalResource(), resourceTracker.getTotalResource().get(Integer.valueOf(this.workerContext.getConnectionInfo().getId())));
        Assert.assertEquals(this.resourceManager.getAvailableResource(), resourceTracker.getAvailableResource().get(Integer.valueOf(this.workerContext.getConnectionInfo().getId())));
    }

    @Test(timeout = 20000)
    public void testPing() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.statusUpdater = new MockNodeStatusUpdater(countDownLatch, this.workerContext);
        this.statusUpdater.init(this.conf);
        this.statusUpdater.start();
        MockNodeStatusUpdater.MockResourceTracker resourceTracker = this.statusUpdater.getResourceTracker();
        countDownLatch.await();
        ResourceProtos.NodeHeartbeatRequest lastRequest = resourceTracker.getLastRequest();
        Assert.assertTrue(lastRequest.hasWorkerId());
        Assert.assertTrue(lastRequest.hasAvailableResource());
        Assert.assertTrue(lastRequest.hasRunningTasks());
        Assert.assertTrue(lastRequest.hasRunningQueryMasters());
        Assert.assertFalse(lastRequest.hasTotalResource());
        Assert.assertFalse(lastRequest.hasConnectionInfo());
    }

    @Test(timeout = 20000)
    public void testResourceReport() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.statusUpdater = new MockNodeStatusUpdater(countDownLatch, this.workerContext);
        this.statusUpdater.init(this.conf);
        this.statusUpdater.start();
        Assert.assertEquals(0L, this.statusUpdater.getQueueSize());
        for (int i = 0; i < this.statusUpdater.getQueueingThreshold(); i++) {
            this.dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
        }
        countDownLatch.await();
        Assert.assertEquals(0L, this.statusUpdater.getQueueSize());
    }

    @Test(timeout = 20000)
    public void testFlushResourceReport() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.statusUpdater = new MockNodeStatusUpdater(countDownLatch, this.workerContext);
        this.statusUpdater.init(this.conf);
        this.statusUpdater.start();
        Assert.assertEquals(0L, this.statusUpdater.getQueueSize());
        this.dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
        countDownLatch.await();
        Assert.assertEquals(0L, this.statusUpdater.getQueueSize());
    }
}
