package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleTcpSource.class */
public class SimpleTcpSource extends BaseSource implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
    private ServerBootstrap bootstrap;
    private boolean tcpNoDelay;
    private boolean tcpKeepAlive;
    private int highWaterMark;
    private boolean enableBusyWait;

    public SimpleTcpSource() {
        ConfigManager.getInstance().regIPVisitConfigChgCallback(this);
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public void configure(Context context) {
        logger.info("Source {} context is {}", getName(), context);
        super.configure(context);
        this.tcpNoDelay = context.getBoolean("tcpNoDelay", true).booleanValue();
        this.tcpKeepAlive = context.getBoolean("keepAlive", true).booleanValue();
        this.enableBusyWait = context.getBoolean("enableBusyWait", false).booleanValue();
        this.highWaterMark = ConfStringUtils.getIntValue(context, "highWaterMark", 65536);
        Preconditions.checkArgument(this.highWaterMark >= 0, "65536 must be >= 0");
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void startSource() {
        logger.info("start " + getCachedSrcName());
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(this.maxAcceptThreads, this.enableBusyWait, new DefaultThreadFactory(getCachedSrcName() + "-boss-group"));
        this.workerGroup = EventLoopUtil.newEventLoopGroup(this.maxWorkerThreads, this.enableBusyWait, new DefaultThreadFactory(getCachedSrcName() + "-worker-group"));
        this.bootstrap = new ServerBootstrap();
        if (this.conLinger >= 0) {
            this.bootstrap.option(ChannelOption.SO_LINGER, Integer.valueOf(this.conLinger));
        }
        this.bootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.conBacklog));
        this.bootstrap.option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(this.reuseAddress));
        this.bootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
        this.bootstrap.childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.tcpNoDelay));
        this.bootstrap.childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.tcpKeepAlive));
        this.bootstrap.childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.maxRcvBufferSize));
        this.bootstrap.childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.maxSendBufferSize));
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(this.highWaterMark));
        this.bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(this.workerGroup));
        EventLoopUtil.enableTriggeredMode(this.bootstrap);
        this.bootstrap.group(this.acceptorGroup, this.workerGroup);
        this.bootstrap.childHandler(getChannelInitializerFactory());
        try {
            if (this.srcHost == null) {
                this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.srcPort)).sync();
            } else {
                this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.srcHost, this.srcPort)).sync();
            }
        } catch (Throwable th) {
            logger.error("Source {} bind ({}:{}) error, program will exit! e = {}", new Object[]{getCachedSrcName(), this.srcHost, Integer.valueOf(this.srcPort), th});
            System.exit(-1);
        }
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void stop() {
        super.stop();
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public String getProtocolName() {
        return "tcp";
    }
}
