package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.lang.reflect.Constructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/BaseSource.class */
public abstract class BaseSource extends AbstractSource implements EventDrivenSource, Configurable {
    protected Context context;
    protected int port;
    protected String msgFactoryName;
    protected String serviceDecoderName;
    protected String messageHandlerName;
    protected int maxMsgLength;
    protected boolean isCompressed;
    protected String topic;
    protected String attr;
    protected boolean filterEmptyMsg;
    private int statIntervalSec;
    protected int pkgTimeoutSec;
    private static final String CONNECTIONS = "connections";
    private DataProxyMetricItemSet metricItemSet;
    private MonitorIndex monitorIndex;
    private MonitorIndexExt monitorIndexExt;
    protected EventLoopGroup acceptorGroup;
    protected EventLoopGroup workerGroup;
    protected DefaultThreadFactory acceptorThreadFactory;
    protected ChannelFuture channelFuture;
    protected int receiveBufferSize;
    protected int sendBufferSize;
    private static final Logger logger = LoggerFactory.getLogger(BaseSource.class);
    private static String HOST_DEFAULT_VALUE = "0.0.0.0";
    private static int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
    private static int DEFAULT_MAX_CONNECTIONS = 5000;
    private static int STAT_INTERVAL_MUST_THAN = 0;
    private static int PKG_TIMEOUT_DEFAULT_SEC = 3;
    private static int MSG_MIN_LENGTH = 4;
    private static int MAX_MSG_DEFAULT_LENGTH = 65536;
    private static int INTERVAL_SEC = 60;
    protected static int BUFFER_SIZE_MUST_THAN = 0;
    protected static int DEFAULT_MAX_THREADS = 32;
    protected static int RECEIVE_BUFFER_DEFAULT_SIZE = 65536;
    protected static int SEND_BUFFER_DEFAULT_SIZE = 65536;
    protected static int RECEIVE_BUFFER_MAX_SIZE = 16777216;
    protected static int SEND_BUFFER_MAX_SIZE = 16777216;
    protected String host = null;
    protected int maxConnections = Integer.MAX_VALUE;
    protected boolean customProcessor = false;
    protected boolean enableBusyWait = false;
    protected int maxThreads = 32;
    protected int acceptorThreads = 1;
    protected ChannelGroup allChannels = new DefaultChannelGroup("DefaultChannelGroup", GlobalEventExecutor.INSTANCE);

    public synchronized void start() {
        if (this.customProcessor) {
            FailoverChannelProcessor failoverChannelProcessor = new FailoverChannelProcessor(getChannelProcessor().getSelector());
            failoverChannelProcessor.configure(this.context);
            setChannelProcessor(failoverChannelProcessor);
            FailoverChannelProcessorHolder.setChannelProcessor(failoverChannelProcessor);
        }
        super.start();
        this.metricItemSet = new DataProxyMetricItemSet(ConfigManager.getInstance().getCommonProperties().getOrDefault("proxy.cluster.name", ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME), getName(), String.valueOf(this.port));
        MetricRegister.register(this.metricItemSet);
        this.monitorIndex = new MonitorIndex("Source", INTERVAL_SEC, maxMonitorCnt);
        this.monitorIndexExt = new MonitorIndexExt("DataProxy_monitors#" + getProtocolName(), INTERVAL_SEC, maxMonitorCnt);
        startSource();
    }

    public synchronized void stop() {
        logger.info("[STOP {} SOURCE]{} stopping...", getProtocolName(), getName());
        try {
        } catch (Exception e) {
            logger.warn("Simple Source netty server stop ex, {}", e);
        } finally {
            this.allChannels.clear();
        }
        if (!this.allChannels.isEmpty()) {
            this.allChannels.close().awaitUninterruptibly();
        }
        super.stop();
        if (this.monitorIndex != null) {
            this.monitorIndex.shutDown();
        }
        if (this.monitorIndexExt != null) {
            this.monitorIndexExt.shutDown();
        }
        if (this.channelFuture != null) {
            try {
                this.channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e2) {
                logger.warn("Simple Source netty server stop ex, {}", e2);
            }
        }
        logger.info("[STOP {} SOURCE]{} stopped", getProtocolName(), getName());
    }

    public void configure(Context context) {
        this.context = context;
        this.port = context.getInteger(ConfigConstants.CONFIG_PORT).intValue();
        this.host = context.getString(ConfigConstants.CONFIG_HOST, HOST_DEFAULT_VALUE);
        Configurables.ensureRequiredNonNull(context, new String[]{ConfigConstants.CONFIG_PORT});
        Preconditions.checkArgument(ConfStringUtils.isValidIp(this.host), "ip config not valid");
        Preconditions.checkArgument(ConfStringUtils.isValidPort(this.port), "port config not valid");
        this.msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME, "org.apache.inlong.dataproxy.source.ServerMessageFactory");
        this.msgFactoryName = this.msgFactoryName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.msgFactoryName), "msgFactoryName is empty");
        this.serviceDecoderName = context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME, "org.apache.inlong.dataproxy.source.DefaultServiceDecoder");
        this.serviceDecoderName = this.serviceDecoderName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.serviceDecoderName), "serviceProcessorName is empty");
        this.messageHandlerName = context.getString(ConfigConstants.MESSAGE_HANDLER_NAME, "org.apache.inlong.dataproxy.source.ServerMessageHandler");
        this.messageHandlerName = this.messageHandlerName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.messageHandlerName), "messageHandlerName is empty");
        this.maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, Integer.valueOf(MAX_MSG_DEFAULT_LENGTH)).intValue();
        Preconditions.checkArgument(this.maxMsgLength >= MSG_MIN_LENGTH && this.maxMsgLength <= 20971520, "maxMsgLength must be >= 4 and <= 20971520");
        this.isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true).booleanValue();
        this.filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false).booleanValue();
        this.topic = context.getString("topic", "");
        this.attr = context.getString(ConfigConstants.ATTR);
        Configurables.ensureRequiredNonNull(context, new String[]{ConfigConstants.ATTR});
        this.topic = this.topic.trim();
        this.attr = this.attr.trim();
        Preconditions.checkArgument(!this.attr.isEmpty(), "attr is empty");
        this.statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, Integer.valueOf(INTERVAL_SEC)).intValue();
        Preconditions.checkArgument(this.statIntervalSec >= STAT_INTERVAL_MUST_THAN, "statIntervalSec must be >= 0");
        this.pkgTimeoutSec = context.getInteger(ConfigConstants.PACKAGE_TIMEOUT_SEC, Integer.valueOf(PKG_TIMEOUT_DEFAULT_SEC)).intValue();
        try {
            this.maxConnections = context.getInteger("connections", Integer.valueOf(DEFAULT_MAX_CONNECTIONS)).intValue();
        } catch (NumberFormatException e) {
            logger.warn("BaseSource's \"connections\" property must specify an integer value.", context.getString("connections"));
        }
        try {
            this.maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, Integer.valueOf(DEFAULT_MAX_THREADS)).intValue();
        } catch (NumberFormatException e2) {
            logger.warn("Simple TCP Source max-threads property must specify an integer value. {}", context.getString(ConfigConstants.MAX_THREADS));
        }
        try {
            maxMonitorCnt = context.getInteger(ConfigConstants.MAX_MONITOR_CNT, Integer.valueOf(ConfigConstants.DEF_MONITOR_STAT_CNT)).intValue();
        } catch (NumberFormatException e3) {
            logger.warn("Property {} must specify an integer value: {}", ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT));
        }
        Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0");
        this.receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, Integer.valueOf(RECEIVE_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
            this.receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.receiveBufferSize > BUFFER_SIZE_MUST_THAN, "receiveBufferSize must be > 0");
        this.sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, Integer.valueOf(SEND_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.sendBufferSize > SEND_BUFFER_MAX_SIZE) {
            this.sendBufferSize = SEND_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.sendBufferSize > BUFFER_SIZE_MUST_THAN, "sendBufferSize must be > 0");
        this.enableBusyWait = context.getBoolean(ConfigConstants.ENABLE_BUSY_WAIT, false).booleanValue();
        this.customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false).booleanValue();
    }

    public ChannelInitializer getChannelInitializerFactory() {
        logger.info(new StringBuffer("load msgFactory=").append(this.msgFactoryName).append(" and serviceDecoderName=").append(this.serviceDecoderName).toString());
        try {
            ServiceDecoder serviceDecoder = (ServiceDecoder) Class.forName(this.serviceDecoderName).newInstance();
            Constructor<?> constructor = Class.forName(this.msgFactoryName).getConstructor(BaseSource.class, ChannelGroup.class, String.class, ServiceDecoder.class, String.class, Integer.class, String.class, String.class, Boolean.class, Integer.class, Boolean.class, MonitorIndex.class, MonitorIndexExt.class, String.class);
            logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
            return (ChannelInitializer) constructor.newInstance(this, this.allChannels, getProtocolName(), serviceDecoder, this.messageHandlerName, Integer.valueOf(this.maxMsgLength), this.topic, this.attr, Boolean.valueOf(this.filterEmptyMsg), Integer.valueOf(this.maxConnections), Boolean.valueOf(this.isCompressed), this.monitorIndex, this.monitorIndexExt, getProtocolName());
        } catch (Exception e) {
            logger.error("Simple {} Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}", new Object[]{getProtocolName(), this.msgFactoryName, e});
            stop();
            throw new FlumeException(e.getMessage());
        }
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    public Context getContext() {
        return this.context;
    }

    public abstract String getProtocolName();

    public abstract void startSource();
}
