package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleTcpSource.class */
public class SimpleTcpSource extends BaseSource implements Configurable, ConfigUpdateCallback, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
    private static int TRAFFIC_CLASS_TYPE_0 = 0;
    private static int TRAFFIC_CLASS_TYPE_96 = 96;
    private static int HIGH_WATER_MARK_DEFAULT_VALUE = 65536;
    private boolean tcpNoDelay = true;
    private boolean keepAlive = true;
    private int highWaterMark;
    private int trafficClass;
    protected String topic;
    private ServerBootstrap bootstrap;

    public SimpleTcpSource() {
        ConfigManager.getInstance().regIPVisitConfigChgCallback(this);
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void startSource() {
        logger.info("start " + getName());
        logger.info("Set max workers : {} ;", Integer.valueOf(this.maxThreads));
        this.acceptorThreadFactory = new DefaultThreadFactory("tcpSource-nettyBoss-threadGroup");
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(this.acceptorThreads, false, this.acceptorThreadFactory);
        this.workerGroup = EventLoopUtil.newEventLoopGroup(this.maxThreads, this.enableBusyWait, new DefaultThreadFactory("tcpSource-nettyWorker-threadGroup"));
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
        this.bootstrap.childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.tcpNoDelay));
        this.bootstrap.childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.keepAlive));
        this.bootstrap.childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.receiveBufferSize));
        this.bootstrap.childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.sendBufferSize));
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(this.highWaterMark));
        this.bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(this.workerGroup));
        EventLoopUtil.enableTriggeredMode(this.bootstrap);
        this.bootstrap.group(this.acceptorGroup, this.workerGroup);
        logger.info("load msgFactory=" + this.msgFactoryName + " and serviceDecoderName=" + this.serviceDecoderName);
        this.bootstrap.childHandler(getChannelInitializerFactory());
        try {
            if (this.host == null) {
                this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.port)).sync();
            } else {
                this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.host, this.port)).sync();
            }
        } catch (Exception e) {
            logger.error("Simple TCP Source error bind host {} port {},program will exit! e = {}", new Object[]{this.host, Integer.valueOf(this.port), e});
            System.exit(-1);
        }
        ConfigManager.getInstance().addSourceReportInfo(this.host, String.valueOf(this.port), getProtocolName().toUpperCase());
        logger.info("Simple TCP Source started at host {}, port {}", this.host, Integer.valueOf(this.port));
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void stop() {
        super.stop();
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public void configure(Context context) {
        logger.info("context is {}", context);
        super.configure(context);
        this.tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true).booleanValue();
        this.keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true).booleanValue();
        this.highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, Integer.valueOf(HIGH_WATER_MARK_DEFAULT_VALUE)).intValue();
        this.receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, Integer.valueOf(RECEIVE_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
            this.receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.receiveBufferSize > BUFFER_SIZE_MUST_THAN, "receiveBufferSize must be > 0");
        this.sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, Integer.valueOf(SEND_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.sendBufferSize > SEND_BUFFER_MAX_SIZE) {
            this.sendBufferSize = SEND_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.sendBufferSize > BUFFER_SIZE_MUST_THAN, "sendBufferSize must be > 0");
        this.trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, Integer.valueOf(TRAFFIC_CLASS_TYPE_0)).intValue();
        Preconditions.checkArgument(this.trafficClass == TRAFFIC_CLASS_TYPE_0 || this.trafficClass == TRAFFIC_CLASS_TYPE_96, "trafficClass must be == 0 or == 96");
        try {
            this.maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, 32).intValue();
        } catch (NumberFormatException e) {
            logger.warn("Simple TCP Source max-threads property must specify an integer value. {}", context.getString(ConfigConstants.MAX_THREADS));
        }
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public String getProtocolName() {
        return ConfigConstants.TCP_PROTOCOL;
    }

    @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
    public void update() {
        if (ConfigManager.getInstance().needChkIllegalIP()) {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis();
            for (Channel channel : this.allChannels) {
                String channelRemoteIP = AddressUtils.getChannelRemoteIP(channel);
                if (channelRemoteIP != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
                    channel.disconnect();
                    channel.close();
                    this.allChannels.remove(channel);
                    i++;
                    logger.error(channelRemoteIP + " is Illegal IP, so disconnect it !");
                }
            }
            logger.info("Channel check, {} disconnects {} Illegal channels, waist {} ms", new Object[]{getName(), Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        }
    }
}
