package org.apache.dolphinscheduler.remote.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import lombok.Generated;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/dolphinscheduler/remote/handler/NettyClientHandler.class */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private final NettyRemotingClient nettyRemotingClient;
    private final ExecutorService callbackExecutor;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
    private static byte[] heartBeatData = "heart_beat".getBytes();
    private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
    private final ConcurrentHashMap<MessageType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();

    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService executorService) {
        this.nettyRemotingClient = nettyRemotingClient;
        this.callbackExecutor = executorService;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(channelHandlerContext.channel()));
        channelHandlerContext.channel().close();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        processReceived(channelHandlerContext.channel(), (Message) obj);
    }

    public void registerProcessor(MessageType messageType, NettyRequestProcessor nettyRequestProcessor) {
        registerProcessor(messageType, nettyRequestProcessor, null);
    }

    public void registerProcessor(MessageType messageType, NettyRequestProcessor nettyRequestProcessor, ExecutorService executorService) {
        ExecutorService executorService2 = executorService;
        if (executorService2 == null) {
            executorService2 = this.defaultExecutor;
        }
        this.processors.putIfAbsent(messageType, new Pair<>(nettyRequestProcessor, executorService2));
    }

    private void processReceived(Channel channel, Message message) {
        ResponseFuture future = ResponseFuture.getFuture(message.getOpaque());
        if (future == null) {
            processByCommandType(channel, message);
            return;
        }
        future.setResponseCommand(message);
        future.release();
        if (future.getInvokeCallback() == null) {
            future.putResponse(message);
            return;
        }
        future.removeFuture();
        ExecutorService executorService = this.callbackExecutor;
        future.getClass();
        executorService.submit(future::executeInvokeCallback);
    }

    public void processByCommandType(Channel channel, Message message) {
        Pair<NettyRequestProcessor, ExecutorService> pair = this.processors.get(message.getType());
        if (pair == null) {
            log.warn("receive response {}, but not matched any request ", message);
            return;
        }
        try {
            pair.getRight().submit(() -> {
                try {
                    ((NettyRequestProcessor) pair.getLeft()).process(channel, message);
                } catch (Exception e) {
                    log.error(String.format("process command %s exception", message), e);
                }
            });
        } catch (RejectedExecutionException e) {
            log.warn("thread pool is full, discard command {} from {}", message, ChannelUtils.getRemoteAddress(channel));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("exceptionCaught : {}", th.getMessage(), th);
        this.nettyRemotingClient.closeChannel(ChannelUtils.toAddress(channelHandlerContext.channel()));
        channelHandlerContext.channel().close();
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof IdleStateEvent)) {
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        Message message = new Message();
        message.setType(MessageType.HEART_BEAT);
        message.setBody(heartBeatData);
        channelHandlerContext.channel().writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        if (log.isDebugEnabled()) {
            log.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(channelHandlerContext.channel()));
        }
    }
}
