package org.apache.dolphinscheduler.server.worker.runner;

import ch.qos.logback.classic.ClassicConstants;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.log.SensitiveDataConverter;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.class */
public abstract class WorkerTaskExecuteRunnable implements Runnable {
    protected static final Logger log = LoggerFactory.getLogger(WorkerTaskExecuteRunnable.class);
    protected final TaskExecutionContext taskExecutionContext;
    protected final WorkerConfig workerConfig;
    protected final WorkerMessageSender workerMessageSender;
    protected final TaskPluginManager taskPluginManager;

    @Nullable
    protected final StorageOperate storageOperate;
    protected final WorkerRpcClient workerRpcClient;
    protected final WorkerRegistryClient workerRegistryClient;

    @Nullable
    protected AbstractTask task;

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkerTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull WorkerMessageSender workerMessageSender, @NonNull WorkerRpcClient workerRpcClient, @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate, @NonNull WorkerRegistryClient workerRegistryClient) {
        if (taskExecutionContext == null) {
            throw new NullPointerException("taskExecutionContext is marked non-null but is null");
        }
        if (workerConfig == null) {
            throw new NullPointerException("workerConfig is marked non-null but is null");
        }
        if (workerMessageSender == null) {
            throw new NullPointerException("workerMessageSender is marked non-null but is null");
        }
        if (workerRpcClient == null) {
            throw new NullPointerException("workerRpcClient is marked non-null but is null");
        }
        if (taskPluginManager == null) {
            throw new NullPointerException("taskPluginManager is marked non-null but is null");
        }
        if (workerRegistryClient == null) {
            throw new NullPointerException("workerRegistryClient is marked non-null but is null");
        }
        this.taskExecutionContext = taskExecutionContext;
        this.workerConfig = workerConfig;
        this.workerMessageSender = workerMessageSender;
        this.workerRpcClient = workerRpcClient;
        this.taskPluginManager = taskPluginManager;
        this.storageOperate = storageOperate;
        this.workerRegistryClient = workerRegistryClient;
        SensitiveDataConverter.addMaskPattern("(?<=((?i)configYaml(\" : \"))).*?(?=(\",\\n))");
    }

    protected abstract void executeTask(TaskCallBack taskCallBack);

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterExecute() throws TaskException {
        if (this.task == null) {
            throw new TaskException("The current task instance is null");
        }
        sendAlertIfNeeded();
        sendTaskResult();
        TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        log.info("Remove the current task execute context from worker cache");
        clearTaskExecPathIfNeeded();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterThrowing(Throwable th) throws TaskException {
        if (cancelTask()) {
            log.info("Cancel the task successfully");
        }
        TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
        this.taskExecutionContext.setEndTime(System.currentTimeMillis());
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, MessageType.TASK_EXECUTE_RESULT_MESSAGE);
        log.info("Get a exception when execute the task, will send the task status: {} to master: {}", TaskExecutionStatus.FAILURE.name(), this.taskExecutionContext.getHost());
    }

    public boolean cancelTask() {
        if (this.task == null) {
            return true;
        }
        try {
            this.task.cancel();
            ProcessUtils.cancelApplication(this.taskExecutionContext);
            return true;
        } catch (Exception e) {
            log.error("Cancel task failed, this will not affect the taskInstance status, but you need to check manual", e);
            return false;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                LogUtils.setWorkflowAndTaskInstanceIDMDC(Integer.valueOf(this.taskExecutionContext.getProcessInstanceId()), Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
                LogUtils.setTaskInstanceLogFullPathMDC(this.taskExecutionContext.getLogPath());
                TaskInstanceLogHeader.printInitializeTaskContextHeader();
                initializeTask();
                if (1 == this.taskExecutionContext.getDryRun()) {
                    this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
                    this.taskExecutionContext.setEndTime(System.currentTimeMillis());
                    TaskExecutionContextCacheManager.removeByTaskInstanceId(Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
                    this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, MessageType.TASK_EXECUTE_RESULT_MESSAGE);
                    log.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
                    LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                    LogUtils.removeTaskInstanceLogFullPathMDC();
                    return;
                }
                TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
                beforeExecute();
                TaskCallbackImpl build = TaskCallbackImpl.builder().workerMessageSender(this.workerMessageSender).taskExecutionContext(this.taskExecutionContext).build();
                TaskInstanceLogHeader.printExecuteTaskHeader();
                executeTask(build);
                TaskInstanceLogHeader.printFinalizeTaskHeader();
                afterExecute();
                closeLogAppender();
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                LogUtils.removeTaskInstanceLogFullPathMDC();
            } catch (Throwable th) {
                log.error("Task execute failed, due to meet an exception", th);
                afterThrowing(th);
                closeLogAppender();
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                LogUtils.removeTaskInstanceLogFullPathMDC();
            }
        } catch (Throwable th2) {
            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            LogUtils.removeTaskInstanceLogFullPathMDC();
            throw th2;
        }
    }

    protected void initializeTask() {
        log.info("Begin to initialize task");
        long currentTimeMillis = System.currentTimeMillis();
        this.taskExecutionContext.setStartTime(currentTimeMillis);
        log.info("Set task startTime: {}", Long.valueOf(currentTimeMillis));
        String format = String.format("%s_%s", Integer.valueOf(this.taskExecutionContext.getProcessInstanceId()), Integer.valueOf(this.taskExecutionContext.getTaskInstanceId()));
        this.taskExecutionContext.setTaskAppId(format);
        log.info("Set task appId: {}", format);
        log.info("End initialize task {}", JSONUtils.toPrettyJsonString(this.taskExecutionContext));
    }

    protected void beforeExecute() {
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, MessageType.TASK_EXECUTE_RUNNING_MESSAGE);
        log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(), this.taskExecutionContext.getHost());
        TaskExecutionCheckerUtils.checkTenantExist(this.workerConfig, this.taskExecutionContext);
        log.info("TenantCode: {} check successfully", this.taskExecutionContext.getTenantCode());
        TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(this.taskExecutionContext);
        log.info("WorkflowInstanceExecDir: {} check successfully", this.taskExecutionContext.getExecutePath());
        TaskExecutionCheckerUtils.downloadResourcesIfNeeded(this.storageOperate, this.taskExecutionContext);
        log.info("Download resources: {} successfully", this.taskExecutionContext.getResources());
        TaskFilesTransferUtils.downloadUpstreamFiles(this.taskExecutionContext, this.storageOperate);
        log.info("Download upstream files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(this.taskExecutionContext, Direct.IN));
        this.task = (AbstractTask) Optional.ofNullable(this.taskPluginManager.getTaskChannelMap().get(this.taskExecutionContext.getTaskType())).map(taskChannel -> {
            return taskChannel.createTask(this.taskExecutionContext);
        }).orElseThrow(() -> {
            return new TaskPluginException(this.taskExecutionContext.getTaskType() + " task plugin not found, please check the task type is correct.");
        });
        log.info("Task plugin instance: {} create successfully", this.taskExecutionContext.getTaskType());
        this.task.init();
        log.info("Success initialized task plugin instance successfully");
        this.task.getParameters().setVarPool(this.taskExecutionContext.getVarPool());
        log.info("Set taskVarPool: {} successfully", this.taskExecutionContext.getVarPool());
    }

    protected void sendAlertIfNeeded() {
        if (this.task.getNeedAlert()) {
            Optional<Host> alertServerAddress = this.workerRegistryClient.getAlertServerAddress();
            if (!alertServerAddress.isPresent()) {
                log.error("Cannot get alert server address, please check the alert server is running");
                return;
            }
            Host host = alertServerAddress.get();
            TaskExecutionStatus exitStatus = this.task.getExitStatus();
            TaskAlertInfo taskAlertInfo = this.task.getTaskAlertInfo();
            AlertSendRequest alertSendRequest = new AlertSendRequest(taskAlertInfo.getAlertGroupId().intValue(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), exitStatus == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode());
            try {
                this.workerRpcClient.send(host, alertSendRequest.convert2Command());
                log.info("Send alert to: {} successfully", host);
            } catch (RemotingException e) {
                log.error("Send alert to: {} failed, alertCommand: {}", new Object[]{host, alertSendRequest, e});
            }
        }
    }

    protected void sendTaskResult() {
        this.taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
        this.taskExecutionContext.setProcessId(this.task.getProcessId());
        this.taskExecutionContext.setAppIds(this.task.getAppIds());
        this.taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
        this.taskExecutionContext.setEndTime(System.currentTimeMillis());
        TaskFilesTransferUtils.uploadOutputFiles(this.taskExecutionContext, this.storageOperate);
        log.info("Upload output files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(this.taskExecutionContext, Direct.OUT));
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, MessageType.TASK_EXECUTE_RESULT_MESSAGE);
        log.info("Send task execute status: {} to master : {}", this.taskExecutionContext.getCurrentExecutionStatus().name(), this.taskExecutionContext.getHost());
    }

    protected void clearTaskExecPathIfNeeded() {
        String executePath = this.taskExecutionContext.getExecutePath();
        if (CommonUtils.isDevelopMode()) {
            log.info("The current execute mode is develop mode, will not clear the task execute file: {}", executePath);
            return;
        }
        log.info("The current execute mode isn't develop mode, will clear the task execute file: {}", executePath);
        if (Strings.isNullOrEmpty(executePath)) {
            log.warn("The task execute file is {} no need to clear", this.taskExecutionContext.getTaskName());
            return;
        }
        if ("/".equals(executePath)) {
            log.warn("The task execute file is '/', direct deletion is not allowed");
            return;
        }
        try {
            FileUtils.deleteDirectory(new File(executePath));
            log.info("Success clear the task execute file: {}", executePath);
        } catch (IOException e) {
            if (e instanceof NoSuchFileException) {
                return;
            }
            log.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", executePath, e);
        }
    }

    protected void closeLogAppender() {
        try {
            try {
                if (RemoteLogUtils.isRemoteLoggingEnable()) {
                    RemoteLogUtils.sendRemoteLog(this.taskExecutionContext.getLogPath());
                    log.info("Log handler sends task log {} to remote storage asynchronously.", this.taskExecutionContext.getLogPath());
                }
                log.info(ClassicConstants.FINALIZE_SESSION_MARKER, ClassicConstants.FINALIZE_SESSION_MARKER.toString());
            } catch (Exception e) {
                log.error("Send remote log failed", e);
                log.info(ClassicConstants.FINALIZE_SESSION_MARKER, ClassicConstants.FINALIZE_SESSION_MARKER.toString());
            }
        } catch (Throwable th) {
            log.info(ClassicConstants.FINALIZE_SESSION_MARKER, ClassicConstants.FINALIZE_SESSION_MARKER.toString());
            throw th;
        }
    }

    @NonNull
    public TaskExecutionContext getTaskExecutionContext() {
        return this.taskExecutionContext;
    }

    @Nullable
    public AbstractTask getTask() {
        return this.task;
    }
}
