/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TaskCallbackService {
    private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
    private static final int[] RETRY_BACKOFF = new int[]{1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
    @Autowired
    private TaskExecuteRunningAckProcessor taskExecuteRunningProcessor;
    @Autowired
    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
    private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap();
    private final NettyRemotingClient nettyRemotingClient;

    public TaskCallbackService() {
        NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, (NettyRequestProcessor)this.taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, (NettyRequestProcessor)this.taskExecuteResponseAckProcessor);
    }

    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
        REMOTE_CHANNELS.put(taskInstanceId, channel);
    }

    public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
        if (REMOTE_CHANNELS.containsKey(taskInstanceId)) {
            REMOTE_CHANNELS.remove(taskInstanceId);
        }
        REMOTE_CHANNELS.put(taskInstanceId, channel);
    }

    private NettyRemoteChannel getRemoteChannel(int taskInstanceId) {
        NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
        if (nettyRemoteChannel != null) {
            if (nettyRemoteChannel.isActive()) {
                return nettyRemoteChannel;
            }
            Channel newChannel = this.nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
            if (newChannel != null) {
                return this.getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
            }
        }
        return null;
    }

    public int pause(int ntries) {
        return 1000 * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
    }

    private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId) {
        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
        this.addRemoteChannel(taskInstanceId, remoteChannel);
        return remoteChannel;
    }

    private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId) {
        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
        this.addRemoteChannel(taskInstanceId, remoteChannel);
        return remoteChannel;
    }

    public static void remove(int taskInstanceId) {
        REMOTE_CHANNELS.remove(taskInstanceId);
    }

    public void send(int taskInstanceId, Command command) {
        NettyRemoteChannel nettyRemoteChannel = this.getRemoteChannel(taskInstanceId);
        if (nettyRemoteChannel != null) {
            nettyRemoteChannel.writeAndFlush(command).addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        return;
                    }
                }
            });
        }
    }

    private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
        command.setLogPath(taskExecutionContext.getLogPath());
        command.setHost(taskExecutionContext.getHost());
        command.setStartTime(taskExecutionContext.getStartTime());
        command.setExecutePath(taskExecutionContext.getExecutePath());
        return command;
    }

    private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
        command.setLogPath(taskExecutionContext.getLogPath());
        command.setExecutePath(taskExecutionContext.getExecutePath());
        command.setAppIds(taskExecutionContext.getAppIds());
        command.setProcessId(taskExecutionContext.getProcessId());
        command.setHost(taskExecutionContext.getHost());
        command.setStartTime(taskExecutionContext.getStartTime());
        command.setEndTime(taskExecutionContext.getEndTime());
        command.setVarPool(taskExecutionContext.getVarPool());
        command.setExecutePath(taskExecutionContext.getExecutePath());
        return command;
    }

    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
        taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
        taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(",")));
        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
        return taskKillResponseCommand;
    }

    public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteRunningCommand command = this.buildTaskExecuteRunningCommand(taskExecutionContext);
        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING);
        this.send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
    }

    public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteRunningCommand command = this.buildTaskExecuteRunningCommand(taskExecutionContext);
        this.send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
    }

    public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteResponseCommand command = this.buildTaskExecuteResponseCommand(taskExecutionContext);
        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT);
        this.send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
    }

    public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
        TaskKillResponseCommand taskKillResponseCommand = this.buildKillTaskResponseCommand(taskExecutionContext);
        this.send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
    }
}

