/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.processor;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import java.util.Date;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskExecuteProcessor
implements NettyRequestProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
    @Autowired
    private WorkerConfig workerConfig;
    @Autowired
    private TaskCallbackService taskCallbackService;
    @Autowired
    private AlertClientService alertClientService;
    @Autowired
    private TaskPluginManager taskPluginManager;
    @Autowired
    private WorkerManagerThread workerManager;

    public void process(Channel channel, Command command) {
        boolean offer;
        Preconditions.checkArgument((CommandType.TASK_EXECUTE_REQUEST == command.getType() ? 1 : 0) != 0, (Object)String.format("invalid command type : %s", command.getType()));
        TaskExecuteRequestCommand taskRequestCommand = (TaskExecuteRequestCommand)JSONUtils.parseObject((byte[])command.getBody(), TaskExecuteRequestCommand.class);
        if (taskRequestCommand == null) {
            logger.error("task execute request command is null");
            return;
        }
        logger.info("task execute request command : {}", (Object)taskRequestCommand);
        String contextJson = taskRequestCommand.getTaskExecutionContext();
        TaskExecutionContext taskExecutionContext = (TaskExecutionContext)JSONUtils.parseObject((String)contextJson, TaskExecutionContext.class);
        if (taskExecutionContext == null) {
            logger.error("task execution context is null");
            return;
        }
        TaskExecutionContextCacheManager.cacheTaskExecutionContext((TaskExecutionContext)taskExecutionContext);
        taskExecutionContext.setHost(NetUtils.getAddr((int)this.workerConfig.getListenPort()));
        taskExecutionContext.setLogPath(LogUtils.getTaskLogPath((TaskExecutionContext)taskExecutionContext));
        if (0 == taskExecutionContext.getDryRun()) {
            if (CommonUtils.isSudoEnable() && this.workerConfig.isTenantAutoCreate()) {
                OSUtils.createUserIfAbsent((String)taskExecutionContext.getTenantCode());
            }
            if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
                logger.error("tenantCode: {} does not exist, taskInstanceId: {}", (Object)taskExecutionContext.getTenantCode(), (Object)taskExecutionContext.getTaskInstanceId());
                TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)taskExecutionContext.getTaskInstanceId());
                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                taskExecutionContext.setEndTime(new Date());
                this.taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
                return;
            }
            String execLocalPath = this.getExecLocalPath(taskExecutionContext);
            logger.info("task instance local execute path : {}", (Object)execLocalPath);
            taskExecutionContext.setExecutePath(execLocalPath);
            try {
                FileUtils.createWorkDirIfAbsent((String)execLocalPath);
            }
            catch (Throwable ex) {
                logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", (Object)execLocalPath, (Object)taskExecutionContext.getTaskInstanceId());
                logger.error("create executeLocalPath fail", ex);
                TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)taskExecutionContext.getTaskInstanceId());
                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                this.taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
                return;
            }
        }
        this.taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
        long remainTime = DateUtils.getRemainTime((Date)taskExecutionContext.getFirstSubmitTime(), (long)((long)taskExecutionContext.getDelayTime() * 60L));
        if (remainTime > 0L) {
            logger.info("delay the execution of task instance {}, delay time: {} s", (Object)taskExecutionContext.getTaskInstanceId(), (Object)remainTime);
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
            taskExecutionContext.setStartTime(null);
            this.taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
        }
        if (!(offer = this.workerManager.offer(new TaskExecuteThread(taskExecutionContext, this.taskCallbackService, this.alertClientService, this.taskPluginManager)))) {
            logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", (Object)this.workerManager.getDelayQueueSize(), (Object)taskExecutionContext.getTaskInstanceId());
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
            this.taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        }
    }

    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
        return FileUtils.getProcessExecDir((long)taskExecutionContext.getProjectCode(), (long)taskExecutionContext.getProcessDefineCode(), (int)taskExecutionContext.getProcessDefineVersion(), (int)taskExecutionContext.getProcessInstanceId(), (int)taskExecutionContext.getTaskInstanceId());
    }
}

