package org.apache.dolphinscheduler.server.worker.rpc;

import lombok.Generated;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.class */
public class StreamingTaskInstanceOperatorImpl implements IStreamingTaskInstanceOperator {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingTaskInstanceOperatorImpl.class);

    @Autowired
    private WorkerTaskExecutorThreadPool workerManager;

    public TaskInstanceTriggerSavepointResponse triggerSavepoint(TaskInstanceTriggerSavepointRequest taskInstanceTriggerSavepointRequest) {
        log.info("Receive triggerSavepoint request: {}", taskInstanceTriggerSavepointRequest);
        try {
            int taskInstanceId = taskInstanceTriggerSavepointRequest.getTaskInstanceId();
            LogUtils.setTaskInstanceIdMDC(Integer.valueOf(taskInstanceId));
            WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
            if (workerTaskExecutor == null) {
                log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", Integer.valueOf(taskInstanceId));
                TaskInstanceTriggerSavepointResponse fail = TaskInstanceTriggerSavepointResponse.fail("Cannot find TaskExecutionContext");
                LogUtils.removeTaskInstanceIdMDC();
                return fail;
            }
            StreamTask task = workerTaskExecutor.getTask();
            if (task == null) {
                log.error("Cannot find StreamTask for taskInstance:{}", Integer.valueOf(taskInstanceId));
                TaskInstanceTriggerSavepointResponse fail2 = TaskInstanceTriggerSavepointResponse.fail("Cannot find StreamTask");
                LogUtils.removeTaskInstanceIdMDC();
                return fail2;
            }
            if (!(task instanceof StreamTask)) {
                log.warn("The taskInstance: {} is not StreamTask", Integer.valueOf(taskInstanceId));
                TaskInstanceTriggerSavepointResponse fail3 = TaskInstanceTriggerSavepointResponse.fail("The taskInstance is not StreamTask");
                LogUtils.removeTaskInstanceIdMDC();
                return fail3;
            }
            try {
                task.savePoint();
                TaskInstanceTriggerSavepointResponse success = TaskInstanceTriggerSavepointResponse.success();
                LogUtils.removeTaskInstanceIdMDC();
                return success;
            } catch (Exception e) {
                log.error("StreamTask: {} call savePoint error", Integer.valueOf(taskInstanceId), e);
                TaskInstanceTriggerSavepointResponse fail4 = TaskInstanceTriggerSavepointResponse.fail("StreamTask call savePoint error: " + e.getMessage());
                LogUtils.removeTaskInstanceIdMDC();
                return fail4;
            }
        } catch (Throwable th) {
            LogUtils.removeTaskInstanceIdMDC();
            throw th;
        }
    }
}
