/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerExecService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WorkerManagerThread
implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
    private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue();
    @Autowired(required=false)
    private StorageOperate storageOperate;
    private final WorkerExecService workerExecService;
    @Autowired
    private TaskCallbackService taskCallbackService;
    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap();

    public WorkerManagerThread(WorkerConfig workerConfig) {
        this.workerExecService = new WorkerExecService(ThreadUtils.newDaemonFixedThreadExecutor((String)"Worker-Execute-Thread", (int)workerConfig.getExecThreads()), this.taskExecuteThreadMap);
    }

    public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
        return this.taskExecuteThreadMap.get(taskInstanceId);
    }

    public int getDelayQueueSize() {
        return this.workerExecuteQueue.size();
    }

    public int getThreadPoolQueueSize() {
        return this.workerExecService.getThreadPoolQueueSize();
    }

    public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
        this.workerExecuteQueue.stream().filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId.intValue()).forEach(this.workerExecuteQueue::remove);
        this.sendTaskKillResponse(taskInstanceId);
    }

    private void sendTaskKillResponse(Integer taskInstanceId) {
        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId((Integer)taskInstanceId);
        if (taskExecutionContext == null) {
            return;
        }
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
        this.taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
    }

    public boolean offer(TaskExecuteThread taskExecuteThread) {
        return this.workerExecuteQueue.offer(taskExecuteThread);
    }

    public void start() {
        Thread thread = new Thread((Runnable)this, this.getClass().getName());
        thread.setDaemon(true);
        thread.start();
    }

    @Override
    public void run() {
        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
        while (Stopper.isRunning()) {
            try {
                TaskExecuteThread taskExecuteThread = (TaskExecuteThread)this.workerExecuteQueue.take();
                taskExecuteThread.setStorageOperate(this.storageOperate);
                this.workerExecService.submit(taskExecuteThread);
            }
            catch (Exception e) {
                this.logger.error("An unexpected interrupt is happened, the exception will be ignored and this thread will continue to run", (Throwable)e);
            }
        }
    }
}

