package com.github.kfcfans.powerjob.worker.core.executor;

import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.google.common.base.Stopwatch;
import java.util.List;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/powerjob-worker-3.4.2.jar:com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.class */
public class ProcessorRunnable implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProcessorRunnable.class);
    private final InstanceInfo instanceInfo;
    private final ActorSelection taskTrackerActor;
    private final TaskDO task;
    private final BasicProcessor processor;
    private final OmsLogger omsLogger;
    private final ClassLoader classLoader;
    private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;

    public void innerRun() throws InterruptedException {
        ProcessResult processResult;
        ProcessResult processResult2;
        ProcessResult processResult3;
        String taskId = this.task.getTaskId();
        Long instanceId = this.task.getInstanceId();
        log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, this.task.getTaskName());
        TaskContext taskContext = new TaskContext();
        BeanUtils.copyProperties(this.task, taskContext);
        taskContext.setJobId(this.instanceInfo.getJobId());
        taskContext.setMaxRetryTimes(this.instanceInfo.getTaskRetryNum());
        taskContext.setCurrentRetryTimes(this.task.getFailedCnt().intValue());
        taskContext.setJobParams(this.instanceInfo.getJobParams());
        taskContext.setInstanceParams(this.instanceInfo.getInstanceParams());
        taskContext.setOmsLogger(this.omsLogger);
        if (this.task.getTaskContent() != null && this.task.getTaskContent().length > 0) {
            taskContext.setSubTask(SerializerUtils.deSerialized(this.task.getTaskContent()));
        }
        taskContext.setUserContext(OhMyWorker.getConfig().getUserContext());
        ThreadLocalStore.setTask(this.task);
        reportStatus(TaskStatus.WORKER_PROCESSING, null, null);
        ExecuteType valueOf = ExecuteType.valueOf(this.instanceInfo.getExecuteType());
        if (TaskConstant.ROOT_TASK_NAME.equals(this.task.getTaskName()) && valueOf == ExecuteType.BROADCAST) {
            if (this.processor instanceof BroadcastProcessor) {
                try {
                    processResult3 = ((BroadcastProcessor) this.processor).preProcess(taskContext);
                } catch (Throwable th) {
                    log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, th);
                    processResult3 = new ProcessResult(false, th.toString());
                }
            } else {
                processResult3 = new ProcessResult(true, "NO_PREPOST_TASK");
            }
            reportStatus(processResult3.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult3.getMsg()), ProcessorReportTaskStatusReq.BROADCAST);
            return;
        }
        if (!TaskConstant.LAST_TASK_NAME.equals(this.task.getTaskName())) {
            try {
                processResult = this.processor.process(taskContext);
            } catch (Throwable th2) {
                log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), th2);
                processResult = new ProcessResult(false, th2.toString());
            }
            reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null);
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
        List<TaskResult> allTaskResult = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, this.task.getSubInstanceId());
        try {
            switch (valueOf) {
                case BROADCAST:
                    if (!(this.processor instanceof BroadcastProcessor)) {
                        processResult2 = BroadcastProcessor.defaultResult(allTaskResult);
                        break;
                    } else {
                        processResult2 = ((BroadcastProcessor) this.processor).postProcess(taskContext, allTaskResult);
                        break;
                    }
                case MAP_REDUCE:
                    if (!(this.processor instanceof MapReduceProcessor)) {
                        processResult2 = new ProcessResult(false, "not implement the MapReduceProcessor");
                        break;
                    } else {
                        processResult2 = ((MapReduceProcessor) this.processor).reduce(taskContext, allTaskResult);
                        break;
                    }
                default:
                    processResult2 = new ProcessResult(false, "IMPOSSIBLE OR BUG");
                    break;
            }
        } catch (Throwable th3) {
            processResult2 = new ProcessResult(false, th3.toString());
            log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, th3);
        }
        reportStatus(processResult2.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult2.getMsg()), null);
        log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, createStarted);
    }

    private void reportStatus(TaskStatus taskStatus, String str, Integer num) {
        ProcessorReportTaskStatusReq processorReportTaskStatusReq = new ProcessorReportTaskStatusReq();
        processorReportTaskStatusReq.setInstanceId(this.task.getInstanceId());
        processorReportTaskStatusReq.setSubInstanceId(this.task.getSubInstanceId());
        processorReportTaskStatusReq.setTaskId(this.task.getTaskId());
        processorReportTaskStatusReq.setStatus(taskStatus.getValue());
        processorReportTaskStatusReq.setResult(str);
        processorReportTaskStatusReq.setReportTime(System.currentTimeMillis());
        processorReportTaskStatusReq.setCmd(num);
        if (!TaskStatus.finishedStatus.contains(Integer.valueOf(taskStatus.getValue()))) {
            this.taskTrackerActor.tell(processorReportTaskStatusReq, null);
        } else {
            if (AkkaUtils.reliableTransmit(this.taskTrackerActor, processorReportTaskStatusReq)) {
                return;
            }
            this.statusReportRetryQueue.add(processorReportTaskStatusReq);
            log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", this.task.getInstanceId(), this.task.getTaskId(), taskStatus, str);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setContextClassLoader(this.classLoader);
        try {
            try {
                innerRun();
                ThreadLocalStore.clear();
            } catch (InterruptedException e) {
                ThreadLocalStore.clear();
            } catch (Throwable th) {
                reportStatus(TaskStatus.WORKER_PROCESS_FAILED, th.toString(), null);
                log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", this.task.getInstanceId(), th);
                ThreadLocalStore.clear();
            }
        } catch (Throwable th2) {
            ThreadLocalStore.clear();
            throw th2;
        }
    }

    private String suit(String str) {
        if (StringUtils.isEmpty(str)) {
            return "";
        }
        int maxResultLength = OhMyWorker.getConfig().getMaxResultLength();
        if (str.length() <= maxResultLength) {
            return str;
        }
        log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.", this.task.getInstanceId(), this.task.getTaskId(), Integer.valueOf(str.length()), Integer.valueOf(maxResultLength));
        return str.substring(0, maxResultLength).concat("...");
    }

    public ProcessorRunnable(InstanceInfo instanceInfo, ActorSelection actorSelection, TaskDO taskDO, BasicProcessor basicProcessor, OmsLogger omsLogger, ClassLoader classLoader, Queue<ProcessorReportTaskStatusReq> queue) {
        this.instanceInfo = instanceInfo;
        this.taskTrackerActor = actorSelection;
        this.task = taskDO;
        this.processor = basicProcessor;
        this.omsLogger = omsLogger;
        this.classLoader = classLoader;
        this.statusReportRetryQueue = queue;
    }
}
