package org.apache.inlong.audit.source;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.protocol.AuditData;
import org.apache.inlong.audit.protocol.Commands;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/source/ServerMessageHandler.class */
public class ServerMessageHandler extends SimpleChannelHandler {
    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
    private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
    private AbstractSource source;
    private final ChannelGroup allChannels;
    private int maxConnections;
    private final ChannelProcessor processor;
    private final ServiceDecoder serviceDecoder;
    private final Gson gson = new Gson();

    /* renamed from: org.apache.inlong.audit.source.ServerMessageHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/audit/source/ServerMessageHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$audit$protocol$AuditApi$BaseCommand$Type = new int[AuditApi.BaseCommand.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$inlong$audit$protocol$AuditApi$BaseCommand$Type[AuditApi.BaseCommand.Type.PING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$inlong$audit$protocol$AuditApi$BaseCommand$Type[AuditApi.BaseCommand.Type.PONG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$inlong$audit$protocol$AuditApi$BaseCommand$Type[AuditApi.BaseCommand.Type.AUDITREQUEST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$inlong$audit$protocol$AuditApi$BaseCommand$Type[AuditApi.BaseCommand.Type.AUDITREPLY.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public ServerMessageHandler(AbstractSource abstractSource, ServiceDecoder serviceDecoder, ChannelGroup channelGroup, Integer num) {
        this.maxConnections = Integer.MAX_VALUE;
        this.source = abstractSource;
        this.processor = abstractSource.getChannelProcessor();
        this.serviceDecoder = serviceDecoder;
        this.allChannels = channelGroup;
        this.maxConnections = num.intValue();
    }

    public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (this.allChannels.size() - 1 >= this.maxConnections) {
            logger.warn("refuse to connect , and connections=" + (this.allChannels.size() - 1) + ", maxConnections=" + this.maxConnections + ",channel is " + channelStateEvent.getChannel());
            channelStateEvent.getChannel().disconnect();
            channelStateEvent.getChannel().close();
        }
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        logger.debug("message received");
        if (messageEvent == null) {
            logger.warn("get null message event, just skip");
            return;
        }
        ChannelBuffer channelBuffer = (ChannelBuffer) messageEvent.getMessage();
        SocketAddress remoteAddress = messageEvent.getRemoteAddress();
        if (channelBuffer.readableBytes() == 0) {
            logger.warn("receive message skip empty msg.");
            channelBuffer.clear();
            return;
        }
        Channel channel = messageEvent.getChannel();
        try {
            AuditApi.BaseCommand extractData = this.serviceDecoder.extractData(channelBuffer, channel);
            if (extractData == null) {
                logger.warn("receive message extractData is null");
                return;
            }
            ChannelBuffer channelBuffer2 = null;
            switch (AnonymousClass1.$SwitchMap$org$apache$inlong$audit$protocol$AuditApi$BaseCommand$Type[extractData.getType().ordinal()]) {
                case 1:
                    Preconditions.checkArgument(extractData.hasPing());
                    channelBuffer2 = Commands.getPongChannelBuffer();
                    break;
                case 2:
                    Preconditions.checkArgument(extractData.hasPong());
                    channelBuffer2 = Commands.getPingChannelBuffer();
                    break;
                case 3:
                    Preconditions.checkArgument(extractData.hasAuditRequest());
                    channelBuffer2 = Commands.getAuditReplylBuffer(handleRequest(extractData.getAuditRequest()));
                    break;
                case 4:
                    Preconditions.checkArgument(extractData.hasAuditReply());
                    break;
            }
            if (channelBuffer2 != null) {
                writeResponse(channel, remoteAddress, channelBuffer2);
            }
        } catch (Exception e) {
            logger.error("extractData has error e {}", e);
            throw new IOException(e.getCause());
        }
    }

    private AuditApi.AuditReply handleRequest(AuditApi.AuditRequest auditRequest) {
        List<AuditApi.AuditMessageBody> msgBodyList;
        AuditApi.AuditReply auditReply = null;
        if (auditRequest != null && (msgBodyList = auditRequest.getMsgBodyList()) != null) {
            int i = 0;
            for (AuditApi.AuditMessageBody auditMessageBody : msgBodyList) {
                AuditData auditData = new AuditData();
                auditData.setIp(auditRequest.getMsgHeader().getIp());
                auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
                auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
                auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
                auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
                auditData.setLogTs(auditMessageBody.getLogTs());
                auditData.setAuditId(auditMessageBody.getAuditId());
                auditData.setCount(auditMessageBody.getCount());
                auditData.setDelay(auditMessageBody.getDelay());
                auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
                auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
                auditData.setSize(auditMessageBody.getSize());
                byte[] bArr = null;
                try {
                    bArr = this.gson.toJson(auditData).getBytes("UTF-8");
                } catch (UnsupportedEncodingException e) {
                    logger.error("UnsupportedEncodingException = {}", e);
                }
                if (bArr != null) {
                    try {
                        this.processor.processEvent(EventBuilder.withBody(bArr, (Map) null));
                    } catch (Throwable th) {
                        logger.error("Error writing to controller,data will discard.", th);
                        i++;
                    }
                }
            }
            if (i != 0) {
                auditReply = AuditApi.AuditReply.newBuilder().setRequestId(auditRequest.getRequestId()).setMessage("Error writing to controller,data will discard. error body num = " + i).setRspCode(AuditApi.AuditReply.RSP_CODE.FAILED).build();
            }
        }
        if (auditReply == null) {
            auditReply = AuditApi.AuditReply.newBuilder().setRequestId(auditRequest.getRequestId()).setRspCode(AuditApi.AuditReply.RSP_CODE.SUCCESS).build();
        }
        return auditReply;
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        logger.error("exception caught", exceptionEvent.getCause());
    }

    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        logger.error("channel closed {}", channelHandlerContext.getChannel());
    }

    private void writeResponse(Channel channel, SocketAddress socketAddress, ChannelBuffer channelBuffer) throws Exception {
        if (channel.isWritable()) {
            channel.write(channelBuffer, socketAddress);
        } else {
            logger.warn("the send buffer2 is full, so disconnect it!please check remote client; Connection info:" + channel);
            throw new Exception(new Throwable("the send buffer2 is full,so disconnect it!please check remote client, Connection info:" + channel));
        }
    }
}
