package org.apache.dolphinscheduler.server.worker.runner.operator;

import lombok.Generated;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.class */
public class TaskInstanceDispatchOperationFunction implements ITaskInstanceOperationFunction<TaskInstanceDispatchRequest, TaskInstanceDispatchResponse> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TaskInstanceDispatchOperationFunction.class);

    @Autowired
    private WorkerConfig workerConfig;

    @Autowired
    private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder;

    @Autowired
    private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;

    @Override // org.apache.dolphinscheduler.server.worker.runner.operator.ITaskInstanceOperationFunction
    public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) {
        log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest);
        TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext();
        try {
            taskExecutionContext.setHost(this.workerConfig.getWorkerAddress());
            taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
            LogUtils.setWorkflowAndTaskInstanceIDMDC(Integer.valueOf(taskExecutionContext.getProcessInstanceId()), Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
            if (!ServerLifeCycleManager.isRunning()) {
                log.error("server is not running. reject task: {}", Integer.valueOf(taskExecutionContext.getProcessInstanceId()));
                TaskInstanceDispatchResponse failed = TaskInstanceDispatchResponse.failed(Integer.valueOf(taskExecutionContext.getTaskInstanceId()), "server is not running");
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                return failed;
            }
            TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
            if (this.workerTaskExecutorThreadPool.submitWorkerTaskExecutor(this.workerTaskExecutorFactoryBuilder.createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor())) {
                log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
                TaskInstanceDispatchResponse success = TaskInstanceDispatchResponse.success(Integer.valueOf(taskExecutionContext.getTaskInstanceId()));
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                return success;
            }
            log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
            TaskInstanceDispatchResponse failed2 = TaskInstanceDispatchResponse.failed(Integer.valueOf(taskExecutionContext.getTaskInstanceId()), "WorkerManagerThread is full");
            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            return failed2;
        } catch (Throwable th) {
            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            throw th;
        }
    }
}
