package org.apache.dolphinscheduler.server.worker.runner;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Date;
import java.util.List;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.service.utils.CommonUtils;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.class */
public abstract class WorkerTaskExecuteRunnable implements Runnable {
    protected final Logger logger = LoggerFactory.getLogger(String.format("TaskLogLogger-%s", WorkerTaskExecuteRunnable.class));
    protected final TaskExecutionContext taskExecutionContext;
    protected final WorkerConfig workerConfig;
    protected final String masterAddress;
    protected final WorkerMessageSender workerMessageSender;
    protected final AlertClientService alertClientService;
    protected final TaskPluginManager taskPluginManager;

    @Nullable
    protected final StorageOperate storageOperate;

    @Nullable
    protected AbstractTask task;

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkerTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull String str, @NonNull WorkerMessageSender workerMessageSender, @NonNull AlertClientService alertClientService, @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate) {
        if (taskExecutionContext == null) {
            throw new NullPointerException("taskExecutionContext is marked non-null but is null");
        }
        if (workerConfig == null) {
            throw new NullPointerException("workerConfig is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("masterAddress is marked non-null but is null");
        }
        if (workerMessageSender == null) {
            throw new NullPointerException("workerMessageSender is marked non-null but is null");
        }
        if (alertClientService == null) {
            throw new NullPointerException("alertClientService is marked non-null but is null");
        }
        if (taskPluginManager == null) {
            throw new NullPointerException("taskPluginManager is marked non-null but is null");
        }
        this.taskExecutionContext = taskExecutionContext;
        this.workerConfig = workerConfig;
        this.masterAddress = str;
        this.workerMessageSender = workerMessageSender;
        this.alertClientService = alertClientService;
        this.taskPluginManager = taskPluginManager;
        this.storageOperate = storageOperate;
        String buildTaskId = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(buildTaskId);
        this.logger.info("Set task logger name: {}", buildTaskId);
    }

    protected abstract void executeTask(TaskCallBack taskCallBack);

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterExecute() throws TaskException {
        if (this.task == null) {
            throw new TaskException("The current task instance is null");
        }
        sendAlertIfNeeded();
        sendTaskResult();
        TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        this.logger.info("Remove the current task execute context from worker cache");
        clearTaskExecPathIfNeeded();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterThrowing(Throwable th) throws TaskException {
        cancelTask();
        TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
        this.taskExecutionContext.setEndTime(new Date());
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RESULT);
        this.logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE);
    }

    public void cancelTask() {
        if (this.task != null) {
            try {
                this.task.cancel();
                List appIdsFromLogFile = LogUtils.getAppIdsFromLogFile(this.taskExecutionContext.getLogPath());
                if (CollectionUtils.isNotEmpty(appIdsFromLogFile)) {
                    ProcessUtils.cancelApplication(appIdsFromLogFile, this.logger, this.taskExecutionContext.getTenantCode(), this.taskExecutionContext.getExecutePath());
                }
            } catch (Exception e) {
                this.logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            Thread.currentThread().setName(this.taskExecutionContext.getTaskLogName());
            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(Integer.valueOf(this.taskExecutionContext.getProcessInstanceId()), Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
            this.logger.info("Begin to pulling task");
            initializeTask();
            if (1 != this.taskExecutionContext.getDryRun()) {
                beforeExecute();
                executeTask(TaskCallbackImpl.builder().workerMessageSender(this.workerMessageSender).masterAddress(this.masterAddress).build());
                afterExecute();
            } else {
                this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
                this.taskExecutionContext.setEndTime(new Date());
                TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
                this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RESULT);
                this.logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
            }
        } catch (Throwable th) {
            this.logger.error("Task execute failed, due to meet an exception", th);
            afterThrowing(th);
        } finally {
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }

    protected void initializeTask() {
        this.logger.info("Begin to initialize task");
        Date date = new Date();
        this.taskExecutionContext.setStartTime(date);
        this.logger.info("Set task startTime: {}", date);
        String systemEnvPath = CommonUtils.getSystemEnvPath();
        this.taskExecutionContext.setEnvFile(systemEnvPath);
        this.logger.info("Set task envFile: {}", systemEnvPath);
        String format = String.format("%s_%s", Integer.valueOf(this.taskExecutionContext.getProcessInstanceId()), Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        this.taskExecutionContext.setTaskAppId(format);
        this.logger.info("Set task appId: {}", format);
        this.logger.info("End initialize task");
    }

    protected void beforeExecute() {
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RUNNING);
        this.logger.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION);
        TaskExecutionCheckerUtils.checkTenantExist(this.workerConfig, this.taskExecutionContext);
        this.logger.info("TenantCode:{} check success", this.taskExecutionContext.getTenantCode());
        TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(this.taskExecutionContext);
        this.logger.info("ProcessExecDir:{} check success", this.taskExecutionContext.getExecutePath());
        TaskExecutionCheckerUtils.downloadResourcesIfNeeded(this.storageOperate, this.taskExecutionContext, this.logger);
        this.logger.info("Resources:{} check success", this.taskExecutionContext.getResources());
        TaskChannel taskChannel = (TaskChannel) this.taskPluginManager.getTaskChannelMap().get(this.taskExecutionContext.getTaskType());
        if (null == taskChannel) {
            throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", this.taskExecutionContext.getTaskType()));
        }
        this.task = taskChannel.createTask(this.taskExecutionContext);
        if (this.task == null) {
            throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", this.taskExecutionContext.getTaskType()));
        }
        this.logger.info("Task plugin: {} create success", this.taskExecutionContext.getTaskType());
        this.task.init();
        this.logger.info("Success initialized task plugin instance success");
        this.task.getParameters().setVarPool(this.taskExecutionContext.getVarPool());
        this.logger.info("Success set taskVarPool: {}", this.taskExecutionContext.getVarPool());
    }

    protected void sendAlertIfNeeded() {
        if (this.task.getNeedAlert()) {
            this.logger.info("The current task need to send alert, begin to send alert");
            TaskExecutionStatus exitStatus = this.task.getExitStatus();
            TaskAlertInfo taskAlertInfo = this.task.getTaskAlertInfo();
            this.alertClientService.sendAlert(taskAlertInfo.getAlertGroupId().intValue(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), exitStatus == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode());
            this.logger.info("Success send alert");
        }
    }

    protected void sendTaskResult() {
        this.taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
        this.taskExecutionContext.setEndTime(new Date());
        this.taskExecutionContext.setProcessId(this.task.getProcessId());
        this.taskExecutionContext.setAppIds(this.task.getAppIds());
        this.taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, this.masterAddress, CommandType.TASK_EXECUTE_RESULT);
        this.logger.info("Send task execute result to master, the current task status: {}", this.taskExecutionContext.getCurrentExecutionStatus());
    }

    protected void clearTaskExecPathIfNeeded() {
        String executePath = this.taskExecutionContext.getExecutePath();
        if (CommonUtils.isDevelopMode()) {
            this.logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", executePath);
            return;
        }
        this.logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", executePath);
        if (Strings.isNullOrEmpty(executePath)) {
            this.logger.warn("The task execute file is {} no need to clear", this.taskExecutionContext.getTaskName());
            return;
        }
        if ("/".equals(executePath)) {
            this.logger.warn("The task execute file is '/', direct deletion is not allowed");
            return;
        }
        try {
            FileUtils.deleteDirectory(new File(executePath));
            this.logger.info("Success clear the task execute file: {}", executePath);
        } catch (IOException e) {
            if (e instanceof NoSuchFileException) {
                return;
            }
            this.logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", executePath, e);
        }
    }

    @NonNull
    public TaskExecutionContext getTaskExecutionContext() {
        return this.taskExecutionContext;
    }

    @Nullable
    public AbstractTask getTask() {
        return this.task;
    }
}
