package org.apache.dolphinscheduler.remote.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import lombok.Generated;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/dolphinscheduler/remote/handler/NettyServerHandler.class */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
    private final NettyRemotingServer nettyRemotingServer;
    private final ConcurrentHashMap<MessageType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();

    public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
        this.nettyRemotingServer = nettyRemotingServer;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.channel().close();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        processReceived(channelHandlerContext.channel(), (Message) obj);
    }

    public void registerProcessor(MessageType messageType, NettyRequestProcessor nettyRequestProcessor) {
        registerProcessor(messageType, nettyRequestProcessor, null);
    }

    public void registerProcessor(MessageType messageType, NettyRequestProcessor nettyRequestProcessor, ExecutorService executorService) {
        ExecutorService executorService2 = executorService;
        if (executorService2 == null) {
            executorService2 = this.nettyRemotingServer.getDefaultExecutor();
        }
        this.processors.putIfAbsent(messageType, new Pair<>(nettyRequestProcessor, executorService2));
    }

    private void processReceived(Channel channel, Message message) {
        MessageType type = message.getType();
        if (MessageType.HEART_BEAT.equals(type)) {
            if (log.isDebugEnabled()) {
                log.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
                return;
            }
            return;
        }
        Pair<NettyRequestProcessor, ExecutorService> pair = this.processors.get(type);
        if (pair == null) {
            log.warn("commandType {} not support", type);
            return;
        }
        try {
            pair.getRight().submit(() -> {
                try {
                    ((NettyRequestProcessor) pair.getLeft()).process(channel, message);
                } catch (Exception e) {
                    log.error("process msg {} error", message, e);
                }
            });
        } catch (RejectedExecutionException e) {
            log.warn("thread pool is full, discard msg {} from {}", message, ChannelUtils.getRemoteAddress(channel));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        log.error("exceptionCaught : {}", th.getMessage(), th);
        channelHandlerContext.channel().close();
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        ChannelConfig config = channel.config();
        if (channel.isWritable()) {
            if (log.isWarnEnabled()) {
                log.warn("{} is writable, to low water : {}", channel, Integer.valueOf(config.getWriteBufferLowWaterMark()));
            }
            config.setAutoRead(true);
        } else {
            if (log.isWarnEnabled()) {
                log.warn("{} is not writable, over high water level : {}", channel, Integer.valueOf(config.getWriteBufferHighWaterMark()));
            }
            config.setAutoRead(false);
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            channelHandlerContext.channel().close();
        } else {
            super.userEventTriggered(channelHandlerContext, obj);
        }
    }
}
