package org.apache.inlong.audit.source;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.audit.channel.FailoverChannelProcessor;
import org.apache.inlong.audit.utils.EventLoopUtil;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/source/SimpleTcpSource.class */
public class SimpleTcpSource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final String CONNECTIONS = "connections";
    protected Context context;
    protected EventLoopGroup acceptorGroup;
    protected EventLoopGroup workerGroup;
    protected DefaultThreadFactory acceptorThreadFactory;
    protected ChannelFuture channelFuture;
    protected int port;
    protected String msgFactoryName;
    protected String serviceDecoderName;
    protected String messageHandlerName;
    protected int maxMsgLength;
    private int receiveBufferSize;
    private int highWaterMark;
    private int sendBufferSize;
    private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
    private static String HOST_DEFAULT_VALUE = "0.0.0.0";
    private static int DEFAULT_MAX_THREADS = 32;
    private static int DEFAULT_MAX_CONNECTIONS = 5000;
    private static int MIN_MSG_LENGTH = 4;
    private static int MAX_MSG_LENGTH = 65536;
    private static int BUFFER_SIZE_MUST_THAN = 0;
    private static int HIGH_WATER_MARK_DEFAULT_VALUE = 65536;
    private static int RECEIVE_BUFFER_DEFAULT_SIZE = 65536;
    private static int SEND_BUFFER_DEFAULT_SIZE = 65536;
    private static int RECEIVE_BUFFER_MAX_SIZE = 16777216;
    private static int SEND_BUFFER_MAX_SIZE = 16777216;
    protected int maxConnections = Integer.MAX_VALUE;
    private ServerBootstrap bootstrap = null;
    protected String host = null;
    private int maxThreads = 32;
    private boolean tcpNoDelay = true;
    private boolean keepAlive = true;
    protected boolean customProcessor = false;
    protected boolean enableBusyWait = false;
    protected int acceptorThreads = 1;
    private Channel nettyChannel = null;
    protected ChannelGroup allChannels = new DefaultChannelGroup("DefaultAuditChannelGroup", GlobalEventExecutor.INSTANCE);

    public synchronized void start() {
        logger.info("start " + getName());
        if (this.customProcessor) {
            FailoverChannelProcessor failoverChannelProcessor = new FailoverChannelProcessor(getChannelProcessor().getSelector());
            failoverChannelProcessor.configure(this.context);
            setChannelProcessor(failoverChannelProcessor);
            FailoverChannelProcessorHolder.setChannelProcessor(failoverChannelProcessor);
        }
        super.start();
        this.acceptorThreadFactory = new DefaultThreadFactory("tcpSource-nettyBoss-threadGroup");
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(this.acceptorThreads, false, this.acceptorThreadFactory);
        this.workerGroup = EventLoopUtil.newEventLoopGroup(this.maxThreads, this.enableBusyWait, new DefaultThreadFactory("tcpSource-nettyWorker-threadGroup"));
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
        this.bootstrap.childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.tcpNoDelay));
        this.bootstrap.childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.keepAlive));
        this.bootstrap.childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.receiveBufferSize));
        this.bootstrap.childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.sendBufferSize));
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(this.highWaterMark));
        this.bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(this.workerGroup));
        EventLoopUtil.enableTriggeredMode(this.bootstrap);
        this.bootstrap.group(this.acceptorGroup, this.workerGroup);
        logger.info("load msgFactory=" + this.msgFactoryName + " and serviceDecoderName=" + this.serviceDecoderName);
        try {
            ServiceDecoder serviceDecoder = (ServiceDecoder) Class.forName(this.serviceDecoderName).newInstance();
            Constructor<?> constructor = Class.forName(this.msgFactoryName).getConstructor(AbstractSource.class, ChannelGroup.class, ServiceDecoder.class, String.class, Integer.class, Integer.class, String.class);
            logger.info("Using channel processor:{}", getClass().getName());
            this.bootstrap.childHandler((ChannelInitializer) constructor.newInstance(this, this.allChannels, serviceDecoder, this.messageHandlerName, Integer.valueOf(this.maxMsgLength), Integer.valueOf(this.maxConnections), getName()));
            try {
                if (this.host == null) {
                    this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.port)).sync();
                } else {
                    this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.host, this.port)).sync();
                }
            } catch (Exception e) {
                logger.error("Simple TCP Source bind {}:{} error, program will exit!", new Object[]{this.host, Integer.valueOf(this.port), e});
                System.exit(-1);
            }
            logger.info("Simple TCP Source started at host {}, port {}", this.host, Integer.valueOf(this.port));
        } catch (Exception e2) {
            logger.error("Simple Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}", this.msgFactoryName, e2);
            stop();
            throw new FlumeException(e2.getMessage());
        }
    }

    public synchronized void stop() {
        logger.info("[STOP SOURCE]{} stopping...", super.getName());
        if (this.allChannels != null && !this.allChannels.isEmpty()) {
            try {
                this.allChannels.close().awaitUninterruptibly();
            } catch (Exception e) {
                logger.warn("Simple TCP Source netty server stop ex", e);
            } finally {
                this.allChannels.clear();
            }
        }
        super.stop();
        logger.info("[STOP SOURCE]{} stopped", super.getName());
    }

    public void configure(Context context) {
        logger.info("context is {}", context);
        this.context = context;
        this.port = context.getInteger("port").intValue();
        this.host = context.getString("host", HOST_DEFAULT_VALUE);
        this.tcpNoDelay = context.getBoolean("tcpNoDelay", true).booleanValue();
        this.keepAlive = context.getBoolean("keepAlive", true).booleanValue();
        this.highWaterMark = context.getInteger("highWaterMark", Integer.valueOf(HIGH_WATER_MARK_DEFAULT_VALUE)).intValue();
        this.receiveBufferSize = context.getInteger("receiveBufferSize", Integer.valueOf(RECEIVE_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
            this.receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.receiveBufferSize > BUFFER_SIZE_MUST_THAN, "receiveBufferSize must be > 0");
        this.sendBufferSize = context.getInteger("sendBufferSize", Integer.valueOf(SEND_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.sendBufferSize > SEND_BUFFER_MAX_SIZE) {
            this.sendBufferSize = SEND_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.sendBufferSize > BUFFER_SIZE_MUST_THAN, "sendBufferSize must be > 0");
        try {
            this.maxThreads = context.getInteger("max-threads", Integer.valueOf(DEFAULT_MAX_THREADS)).intValue();
        } catch (NumberFormatException e) {
            logger.warn("Simple TCP Source max-threads property must specify an integer value. {}", context.getString("max-threads"));
        }
        try {
            this.maxConnections = context.getInteger(CONNECTIONS, Integer.valueOf(DEFAULT_MAX_CONNECTIONS)).intValue();
        } catch (NumberFormatException e2) {
            logger.warn("BaseSource's \"connections\" property must specify an integer value.", context.getString(CONNECTIONS));
        }
        this.msgFactoryName = context.getString("msg-factory-name", "org.apache.inlong.audit.source.ServerMessageFactory");
        this.msgFactoryName = this.msgFactoryName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.msgFactoryName), "msgFactoryName is empty");
        this.serviceDecoderName = context.getString("service-decoder-name", "org.apache.inlong.audit.source.DefaultServiceDecoder");
        this.serviceDecoderName = this.serviceDecoderName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.serviceDecoderName), "serviceProcessorName is empty");
        this.messageHandlerName = context.getString("message-handler-name", "org.apache.inlong.audit.source.ServerMessageHandler");
        this.messageHandlerName = this.messageHandlerName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.messageHandlerName), "messageHandlerName is empty");
        this.maxMsgLength = context.getInteger("max-msg-length", Integer.valueOf(MAX_MSG_LENGTH)).intValue();
        Preconditions.checkArgument(this.maxMsgLength >= MIN_MSG_LENGTH && this.maxMsgLength <= 20971520, "maxMsgLength must be >= 4 and <= 20971520");
        this.customProcessor = context.getBoolean("custom-cp", false).booleanValue();
    }
}
