package org.apache.dolphinscheduler.server.worker.runner;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Stream;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.class */
public class WorkerManagerThread implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
    private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();

    @Autowired(required = false)
    private StorageOperate storageOperate;
    private final ExecutorService workerExecService;

    @Autowired
    private TaskCallbackService taskCallbackService;

    public WorkerManagerThread(WorkerConfig workerConfig) {
        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads());
    }

    public int getDelayQueueSize() {
        return this.workerExecuteQueue.size();
    }

    public int getThreadPoolQueueSize() {
        return ((ThreadPoolExecutor) this.workerExecService).getQueue().size();
    }

    public void killTaskBeforeExecuteByInstanceId(Integer num) {
        Stream filter = this.workerExecuteQueue.stream().filter(taskExecuteThread -> {
            return taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == num.intValue();
        });
        DelayQueue<TaskExecuteThread> delayQueue = this.workerExecuteQueue;
        delayQueue.getClass();
        filter.forEach((v1) -> {
            r1.remove(v1);
        });
        sendTaskKillResponse(num);
    }

    private void sendTaskKillResponse(Integer num) {
        TaskExecutionContext byTaskInstanceId = TaskExecutionContextCacheManager.getByTaskInstanceId(num);
        if (byTaskInstanceId == null) {
            return;
        }
        TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand(byTaskInstanceId.getTaskInstanceId(), byTaskInstanceId.getProcessInstanceId());
        taskExecuteResponseCommand.setStatus(ExecutionStatus.KILL.getCode());
        ResponseCache.get().cache(Integer.valueOf(byTaskInstanceId.getTaskInstanceId()), taskExecuteResponseCommand.convert2Command(), Event.RESULT);
        this.taskCallbackService.sendTaskExecuteResponseCommand(byTaskInstanceId);
    }

    public boolean offer(TaskExecuteThread taskExecuteThread) {
        return this.workerExecuteQueue.offer((DelayQueue<TaskExecuteThread>) taskExecuteThread);
    }

    public void start() {
        Thread thread = new Thread(this, getClass().getName());
        thread.setDaemon(true);
        thread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
        while (Stopper.isRunning()) {
            try {
                TaskExecuteThread take = this.workerExecuteQueue.take();
                take.setStorageOperate(this.storageOperate);
                this.workerExecService.submit(take);
            } catch (Exception e) {
                this.logger.error("An unexpected interrupt is happened, the exception will be ignored and this thread will continue to run", e);
            }
        }
    }
}
