package org.apache.inlong.dataproxy.http;

import com.google.common.base.Preconditions;
import java.util.HashSet;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.monitor.CounterGroup;
import org.apache.inlong.common.monitor.CounterGroupExt;
import org.apache.inlong.common.monitor.StatRunner;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/http/HttpBaseSource.class */
public class HttpBaseSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(HttpBaseSource.class);
    protected int port;
    protected int maxMsgLength;
    protected String topic;
    protected String attr;
    protected String messageHandlerName;
    protected boolean filterEmptyMsg;
    private int statIntervalSec;
    private StatRunner statRunner;
    private Thread statThread;
    private static final String CONNECTIONS = "connections";
    protected org.apache.flume.Context context;
    protected String host = null;
    protected int maxConnections = Integer.MAX_VALUE;
    protected boolean customProcessor = false;
    protected CounterGroup counterGroup = new CounterGroup();
    protected CounterGroupExt counterGroupExt = new CounterGroupExt();

    public synchronized void start() {
        if (this.statIntervalSec > 0) {
            HashSet hashSet = new HashSet();
            hashSet.add(StatConstants.EVENT_SUCCESS);
            hashSet.add(StatConstants.EVENT_DROPPED);
            hashSet.add(StatConstants.EVENT_EMPTY);
            hashSet.add(StatConstants.EVENT_OTHEREXP);
            hashSet.add(StatConstants.EVENT_INVALID);
            this.statRunner = new StatRunner(getName(), this.counterGroup, this.counterGroupExt, this.statIntervalSec, hashSet);
            this.statThread = new Thread((Runnable) this.statRunner);
            this.statThread.setName("Thread-Stat-" + getName());
            this.statThread.start();
        }
        if (this.customProcessor) {
            FailoverChannelProcessor failoverChannelProcessor = new FailoverChannelProcessor(getChannelProcessor().getSelector());
            failoverChannelProcessor.configure(this.context);
            setChannelProcessor(failoverChannelProcessor);
        }
        super.start();
    }

    public synchronized void stop() {
        if (this.statIntervalSec > 0) {
            try {
                if (this.statRunner != null) {
                    this.statRunner.shutDown();
                }
                if (this.statThread != null) {
                    this.statThread.interrupt();
                    this.statThread.join();
                }
            } catch (InterruptedException e) {
                logger.warn("statrunner interrupted");
            }
        }
        super.stop();
    }

    public void configure(org.apache.flume.Context context) {
        this.context = context;
        this.port = context.getInteger(ConfigConstants.CONFIG_PORT).intValue();
        this.host = context.getString(ConfigConstants.CONFIG_HOST, "0.0.0.0");
        Configurables.ensureRequiredNonNull(context, new String[]{ConfigConstants.CONFIG_PORT});
        Preconditions.checkArgument(ConfStringUtils.isValidIp(this.host), "ip config not valid");
        Preconditions.checkArgument(ConfStringUtils.isValidPort(this.port), "port config not valid");
        this.maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, 65536).intValue();
        Preconditions.checkArgument(this.maxMsgLength >= 4 && this.maxMsgLength <= 20971520, "maxMsgLength must be >= 4 and <= 65536");
        this.topic = context.getString("topic");
        this.attr = context.getString(ConfigConstants.ATTR);
        Configurables.ensureRequiredNonNull(context, new String[]{"topic", ConfigConstants.ATTR});
        this.topic = this.topic.trim();
        Preconditions.checkArgument(!this.topic.isEmpty(), "topic is empty");
        this.attr = this.attr.trim();
        Preconditions.checkArgument(!this.attr.isEmpty(), "attr is empty");
        this.messageHandlerName = context.getString(ConfigConstants.MESSAGE_HANDLER_NAME, "org.apache.inlong.dataproxy.source.ServerMessageHandler");
        this.messageHandlerName = this.messageHandlerName.trim();
        Preconditions.checkArgument(!this.messageHandlerName.isEmpty(), "messageHandlerName is empty");
        this.filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false).booleanValue();
        this.statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, 60).intValue();
        Preconditions.checkArgument(this.statIntervalSec >= 0, "statIntervalSec must be >= 0");
        this.customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false).booleanValue();
        try {
            this.maxConnections = context.getInteger("connections", 5000).intValue();
        } catch (NumberFormatException e) {
            logger.warn("BaseSource's \"connections\" property must specify an integer value. {}", context.getString("connections"));
        }
    }
}
