package org.apache.tajo.querymaster;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.TpchTestBase;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
import org.apache.tajo.master.event.StageEvent;
import org.apache.tajo.master.event.StageEventType;
import org.apache.tajo.parser.sql.SQLAnalyzer;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.resource.NodeResources;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.worker.MockExecutionBlock;
import org.apache.tajo.worker.MockWorkerContext;
import org.apache.tajo.worker.NodeResourceManager;
import org.apache.tajo.worker.TajoWorker;
import org.apache.tajo.worker.TaskExecutor;
import org.apache.tajo.worker.TaskImpl;
import org.apache.tajo.worker.TaskManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/tajo/querymaster/TestKillQuery.class */
public class TestKillQuery {
    private static TajoTestingCluster cluster;
    private static TajoConf conf;
    private static TajoClient client;
    private static String queryStr = "select t1.l_orderkey, t1.l_partkey, t2.c_custkey from lineitem t1 join customer t2 on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey";

    /* loaded from: input_file:org/apache/tajo/querymaster/TestKillQuery$MockAsyncDispatch.class */
    static class MockAsyncDispatch extends AsyncDispatcher {
        private CountDownLatch latch;
        private Enum eventType;

        MockAsyncDispatch(CountDownLatch countDownLatch, Enum r5) {
            this.latch = countDownLatch;
            this.eventType = r5;
        }

        protected void dispatch(Event event) {
            if (event.getType() == this.eventType) {
                this.latch.countDown();
            }
            super.dispatch(event);
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        cluster = new TajoTestingCluster();
        cluster.startMiniClusterInLocal(1);
        conf = cluster.getConfiguration();
        client = cluster.newTajoClient();
        client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) using text location '" + TpchTestBase.getInstance().getPath("lineitem") + "'");
        Assert.assertTrue(client.existTable("default.lineitem"));
        client.executeQueryAndGetResult("create external table default.customer (c_custkey int, c_name text) using text location '" + TpchTestBase.getInstance().getPath("customer") + "'");
        Assert.assertTrue(client.existTable("default.customer"));
    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (client != null) {
            client.close();
        }
        if (cluster != null) {
            cluster.shutdownMiniCluster();
        }
    }

    @Test
    public final void testKillQueryFromInitState() throws Exception {
        SQLAnalyzer sQLAnalyzer = new SQLAnalyzer();
        QueryContext createDummyContext = LocalTajoTestingUtility.createDummyContext(conf);
        Session createDummySession = LocalTajoTestingUtility.createDummySession();
        CatalogService catalog = cluster.getMaster().getCatalog();
        LogicalPlanner logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
        LogicalOptimizer logicalOptimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
        Expr parse = sQLAnalyzer.parse(queryStr);
        LogicalPlan createPlan = logicalPlanner.createPlan(createDummyContext, parse);
        logicalOptimizer.optimize(createPlan);
        QueryId newQueryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
        QueryContext queryContext = new QueryContext(conf);
        new GlobalPlanner(conf, catalog).build(queryContext, new MasterPlan(newQueryId, queryContext, createPlan));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        QueryMasterTask queryMasterTask = new QueryMasterTask(((TajoWorker) cluster.getTajoWorkers().get(0)).getWorkerContext().getQueryMaster().getContext(), newQueryId, createDummySession, createDummyContext, parse.toJson(), NodeResources.createResource(512), new MockAsyncDispatch(countDownLatch, StageEventType.SQ_INIT));
        queryMasterTask.init(conf);
        queryMasterTask.getQueryTaskContext().getDispatcher().start();
        queryMasterTask.startQuery();
        try {
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Assert.fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
        }
        Stage stage = (Stage) queryMasterTask.getQuery().getStages().iterator().next();
        Assert.assertNotNull(stage);
        queryMasterTask.getEventHandler().handle(new QueryEvent(newQueryId, QueryEventType.KILL));
        try {
            try {
                cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
                Assert.assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
                queryMasterTask.stop();
            } catch (Throwable th) {
                queryMasterTask.stop();
                throw th;
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            if (stage != null) {
                System.err.println(String.format("Stage: [%s] (Total: %d, Complete: %d, Success: %d, Killed: %d, Failed: %d)", stage.getId().toString(), Integer.valueOf(stage.getTotalScheduledObjectsCount()), Integer.valueOf(stage.getCompletedTaskCount()), Integer.valueOf(stage.getSucceededObjectCount()), Integer.valueOf(stage.getKilledObjectCount()), Integer.valueOf(stage.getFailedObjectCount())));
            }
            throw e2;
        }
    }

    @Test
    public final void testIgnoreStageStateFromKilled() throws Exception {
        SQLAnalyzer sQLAnalyzer = new SQLAnalyzer();
        QueryContext createDummyContext = LocalTajoTestingUtility.createDummyContext(conf);
        Session createDummySession = LocalTajoTestingUtility.createDummySession();
        CatalogService catalog = cluster.getMaster().getCatalog();
        LogicalPlanner logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
        LogicalOptimizer logicalOptimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
        Expr parse = sQLAnalyzer.parse(queryStr);
        LogicalPlan createPlan = logicalPlanner.createPlan(createDummyContext, parse);
        logicalOptimizer.optimize(createPlan);
        QueryId newQueryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
        QueryContext queryContext = new QueryContext(conf);
        new GlobalPlanner(conf, catalog).build(queryContext, new MasterPlan(newQueryId, queryContext, createPlan));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        QueryMasterTask queryMasterTask = new QueryMasterTask(((TajoWorker) cluster.getTajoWorkers().get(0)).getWorkerContext().getQueryMaster().getContext(), newQueryId, createDummySession, createDummyContext, parse.toJson(), NodeResources.createResource(512), new MockAsyncDispatch(countDownLatch, TajoProtos.QueryState.QUERY_RUNNING));
        queryMasterTask.init(conf);
        queryMasterTask.getQueryTaskContext().getDispatcher().start();
        queryMasterTask.startQuery();
        try {
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Assert.fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
        }
        Assert.assertNotNull((Stage) queryMasterTask.getQuery().getStages().iterator().next());
        queryMasterTask.getEventHandler().handle(new QueryEvent(newQueryId, QueryEventType.KILL));
        try {
            cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
            Assert.assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
            queryMasterTask.stop();
            ArrayList newArrayList = Lists.newArrayList(queryMasterTask.getQuery().getStages());
            Stage stage = (Stage) newArrayList.get(newArrayList.size() - 1);
            Assert.assertEquals(StageState.KILLED, stage.getSynchronizedState());
            stage.getStateMachine().doTransition(StageEventType.SQ_START, new StageEvent(stage.getId(), StageEventType.SQ_START));
            stage.getStateMachine().doTransition(StageEventType.SQ_KILL, new StageEvent(stage.getId(), StageEventType.SQ_KILL));
            stage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT, new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
            stage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED, new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
            stage.getStateMachine().doTransition(StageEventType.SQ_FAILED, new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
        } catch (Throwable th) {
            queryMasterTask.stop();
            throw th;
        }
    }

    @Test
    public void testKillTask() throws Throwable {
        ExecutionBlockId newExecutionBlockId = QueryIdFactory.newExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 1);
        TaskId newTaskId = QueryIdFactory.newTaskId(newExecutionBlockId);
        final TajoConf tajoConf = new TajoConf();
        TaskRequestImpl taskRequestImpl = new TaskRequestImpl();
        taskRequestImpl.set(new TaskAttemptId(newTaskId, 1), new ArrayList(), (String) null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(tajoConf), (DataChannel) null, (Enforcer) null, new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080).getHostAndQMPort());
        taskRequestImpl.setInterQuery();
        ResourceProtos.ExecutionBlockContextResponse.Builder newBuilder = ResourceProtos.ExecutionBlockContextResponse.newBuilder();
        newBuilder.setExecutionBlockId(newExecutionBlockId.getProto()).setPlanJson("test").setQueryContext(new QueryContext(tajoConf).getProto()).setQueryOutputPath("testpath").setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
        TaskImpl taskImpl = new TaskImpl(taskRequestImpl, new MockExecutionBlock(new MockWorkerContext() { // from class: org.apache.tajo.querymaster.TestKillQuery.1
            @Override // org.apache.tajo.worker.MockWorkerContext
            public TajoConf getConf() {
                return tajoConf;
            }

            public TaskManager getTaskManager() {
                return null;
            }

            public TaskExecutor getTaskExecuor() {
                return null;
            }

            public NodeResourceManager getNodeResourceManager() {
                return null;
            }
        }, newBuilder.build()) { // from class: org.apache.tajo.querymaster.TestKillQuery.2
            public Path createBaseDir() throws IOException {
                return new Path("test");
            }
        });
        taskImpl.kill();
        Assert.assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, taskImpl.getTaskContext().getState());
        try {
            taskImpl.run();
            Assert.assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, taskImpl.getTaskContext().getState());
        } catch (Exception e) {
            Assert.assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, taskImpl.getTaskContext().getState());
        }
    }
}
