package org.apache.tajo.worker;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TajoWorker;
import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
import org.apache.tajo.worker.event.NodeResourceEvent;
import org.apache.tajo.worker.event.TaskStartEvent;

/* loaded from: input_file:org/apache/tajo/worker/TaskExecutor.class */
public class TaskExecutor extends AbstractService implements EventHandler<TaskStartEvent> {
    private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
    private final TajoWorker.WorkerContext workerContext;
    private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
    private final BlockingQueue<Task> taskQueue;
    private final AtomicInteger runningTasks;
    private List<ExecutorService> fetcherThreadPoolList;
    private ExecutorService threadPool;
    private TajoConf tajoConf;
    private volatile boolean isStopped;

    public TaskExecutor(TajoWorker.WorkerContext workerContext) {
        super(TaskExecutor.class.getName());
        this.workerContext = workerContext;
        this.allocatedResourceMap = Maps.newConcurrentMap();
        this.runningTasks = new AtomicInteger();
        this.taskQueue = new LinkedBlockingQueue();
        this.fetcherThreadPoolList = Lists.newArrayList();
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        this.tajoConf = (TajoConf) TUtil.checkTypeAndGet(configuration, TajoConf.class);
        super.serviceInit(configuration);
    }

    protected void serviceStart() throws Exception {
        int intVar = this.tajoConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
        this.threadPool = Executors.newFixedThreadPool(intVar, new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build());
        int intVar2 = this.tajoConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
        for (int i = 0; i < intVar; i++) {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(intVar2, new ThreadFactoryBuilder().setNameFormat("TaskContainer[" + i + "] fetcher executor #%d").build());
            this.threadPool.submit(new TaskContainer(i, this, newFixedThreadPool));
            this.fetcherThreadPoolList.add(newFixedThreadPool);
        }
        super.serviceStart();
        LOG.info("Started TaskExecutor[" + intVar + "], Fetcher executor[" + intVar2 + "]");
    }

    protected void serviceStop() throws Exception {
        this.isStopped = true;
        this.threadPool.shutdown();
        Iterator<ExecutorService> it = this.fetcherThreadPoolList.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        super.serviceStop();
    }

    public boolean isStopped() {
        return this.isStopped;
    }

    public int getRunningTasks() {
        return this.runningTasks.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Task getNextTask() {
        Task task = null;
        try {
            task = this.taskQueue.take();
        } catch (InterruptedException e) {
            LOG.fatal(e);
        }
        return task;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopTask(TaskAttemptId taskAttemptId) {
        this.runningTasks.decrementAndGet();
        releaseResource(taskAttemptId);
    }

    protected void releaseResource(TaskAttemptId taskAttemptId) {
        NodeResource remove = this.allocatedResourceMap.remove(taskAttemptId);
        if (remove != null) {
            releaseResource(remove);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Task resource " + taskAttemptId + " is released. (" + remove + ")");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseResource(NodeResource nodeResource) {
        this.workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(new NodeResourceDeallocateEvent(nodeResource, NodeResourceEvent.ResourceType.TASK));
    }

    protected Task createTask(ExecutionBlockContext executionBlockContext, ResourceProtos.TaskRequestProto taskRequestProto) throws IOException {
        TaskImpl taskImpl = null;
        TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequestProto.getId());
        if (executionBlockContext.getTasks().containsKey(taskAttemptId)) {
            String str = "Duplicate Task Attempt: " + taskAttemptId;
            LOG.error(str);
            executionBlockContext.fatalError(taskAttemptId, str);
        } else {
            taskImpl = new TaskImpl(new TaskRequestImpl(taskRequestProto), executionBlockContext);
            executionBlockContext.getTasks().put(taskImpl.getTaskContext().getTaskId(), taskImpl);
        }
        return taskImpl;
    }

    public void handle(TaskStartEvent taskStartEvent) {
        this.allocatedResourceMap.put(taskStartEvent.getTaskAttemptId(), taskStartEvent.getAllocatedResource());
        try {
            Task createTask = createTask(this.workerContext.getTaskManager().getExecutionBlockContext(taskStartEvent.getTaskAttemptId().getTaskId().getExecutionBlockId()), taskStartEvent.getTaskRequest());
            if (createTask != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Arrival task: " + createTask.getTaskContext().getTaskId() + ", allocated resource: " + taskStartEvent.getAllocatedResource());
                }
                this.taskQueue.put(createTask);
                this.runningTasks.incrementAndGet();
            } else {
                LOG.warn("Release duplicate task resource: " + taskStartEvent.getAllocatedResource());
                stopTask(taskStartEvent.getTaskAttemptId());
            }
        } catch (InterruptedException e) {
            if (this.isStopped) {
                return;
            }
            LOG.fatal(e.getMessage(), e);
        } catch (Exception e2) {
            stopTask(taskStartEvent.getTaskAttemptId());
        }
    }
}
