package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
import org.apache.inlong.dataproxy.source.httpMsg.HttpMessageHandler;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/BaseSource.class */
public abstract class BaseSource extends AbstractSource implements ConfigUpdateCallback, ProxyServiceMBean, EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(BaseSource.class);
    protected Context context;
    protected String cachedSrcName;
    protected ChannelProcessor cachedChProcessor;
    protected String srcHost;
    protected int srcPort;
    protected String strPort;
    protected String rptSrcType;
    protected String msgFactoryName;
    protected String messageHandlerName;
    protected int maxMsgLength;
    protected boolean isCompressed;
    protected boolean filterEmptyMsg;
    protected boolean customProcessor;
    protected int maxWorkerThreads;
    protected int maxAcceptThreads;
    protected long maxReadIdleTimeMs;
    protected int maxConnections;
    protected boolean reuseAddress;
    protected int conBacklog;
    protected EventLoopGroup acceptorGroup;
    protected EventLoopGroup workerGroup;
    protected ChannelFuture channelFuture;
    protected int maxRcvBufferSize;
    protected int maxSendBufferSize;
    private DataProxyMetricItemSet metricItemSet;
    protected volatile boolean isRejectService = false;
    protected int conLinger = -1;
    private MonitorIndex monitorIndex = null;
    private MonitorStats monitorStats = null;
    protected ChannelGroup allChannels = new DefaultChannelGroup("DefaultChannelGroup", GlobalEventExecutor.INSTANCE);
    protected boolean enableFileMetric = CommonConfigHolder.getInstance().isEnableFileMetric();

    public void configure(Context context) {
        this.cachedSrcName = getName();
        logger.info("{} start to configure context:{}.", this.cachedSrcName, context.toString());
        this.context = context;
        this.srcHost = getHostIp(context);
        this.srcPort = getHostPort(context);
        this.strPort = String.valueOf(this.srcPort);
        String string = context.getString(SourceConstants.SRCCXT_LOGIC_TYPE_NAME, "INLONG");
        Preconditions.checkArgument(StringUtils.isNotBlank(string), "logic-type-name config is blank");
        this.rptSrcType = string.trim().toUpperCase();
        String trim = context.getString("msg-factory-name", ServerMessageFactory.class.getName()).trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(trim), "msg-factory-name config is blank");
        this.msgFactoryName = trim.trim();
        String string2 = context.getString("message-handler-name");
        if (StringUtils.isBlank(string2)) {
            string2 = SourceConstants.SRC_PROTOCOL_TYPE_HTTP.equalsIgnoreCase(getProtocolName()) ? HttpMessageHandler.class.getName() : ServerMessageHandler.class.getName();
        }
        Preconditions.checkArgument(StringUtils.isNotBlank(string2), "message-handler-name config is blank");
        this.messageHandlerName = string2;
        this.maxMsgLength = ConfStringUtils.getIntValue(context, "max-msg-length", 65536);
        Preconditions.checkArgument(this.maxMsgLength >= 5 && this.maxMsgLength <= 20971520, "max-msg-length must be in [5, 20971520]");
        this.isCompressed = context.getBoolean("msg-compressed", true).booleanValue();
        this.filterEmptyMsg = context.getBoolean("filter-empty-msg", false).booleanValue();
        this.customProcessor = context.getBoolean("custom-cp", false).booleanValue();
        this.maxAcceptThreads = ConfStringUtils.getIntValue(context, SourceConstants.SRCCXT_MAX_ACCEPT_THREADS, 1);
        Preconditions.checkArgument(this.maxAcceptThreads >= 1 && this.maxAcceptThreads <= 10, "max-accept-threads must be in [1, 10]");
        this.maxWorkerThreads = ConfStringUtils.getIntValue(context, "max-threads", SourceConstants.VAL_DEF_WORKER_THREADS);
        Preconditions.checkArgument(this.maxWorkerThreads >= 1, "max-threads must be >= 1");
        this.maxReadIdleTimeMs = ConfStringUtils.getLongValue(context, SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS, SourceConstants.VAL_DEF_READ_IDLE_TIME_MS);
        Preconditions.checkArgument(this.maxReadIdleTimeMs >= 60000 && this.maxReadIdleTimeMs <= SourceConstants.VAL_MAX_READ_IDLE_TIME_MS, "maxReadIdleTime must be in [60000, 4200000]");
        this.maxConnections = ConfStringUtils.getIntValue(context, SourceConstants.SRCCXT_MAX_CONNECTION_CNT, SourceConstants.VAL_DEF_MAX_CONNECTION_CNT);
        Preconditions.checkArgument(this.maxConnections >= 0, "connections must be >= 0");
        this.conBacklog = ConfStringUtils.getIntValue(context, SourceConstants.SRCCXT_CONN_BACKLOG, SourceConstants.VAL_DEF_CONN_BACKLOG);
        Preconditions.checkArgument(this.conBacklog >= 0, "con-backlog must be >= 0");
        Integer integer = context.getInteger(SourceConstants.SRCCXT_CONN_LINGER);
        if (integer != null && integer.intValue() >= 0) {
            this.conLinger = integer.intValue();
        }
        this.reuseAddress = context.getBoolean(SourceConstants.SRCCXT_REUSE_ADDRESS, true).booleanValue();
        this.customProcessor = context.getBoolean("custom-cp", false).booleanValue();
        this.maxRcvBufferSize = ConfStringUtils.getIntValue(context, "receiveBufferSize", 65536);
        Preconditions.checkArgument(this.maxRcvBufferSize >= 0, "receiveBufferSize must be >= 0");
        this.maxSendBufferSize = ConfStringUtils.getIntValue(context, "sendBufferSize", 65536);
        Preconditions.checkArgument(this.maxSendBufferSize >= 0, "sendBufferSize must be >= 0");
    }

    public synchronized void start() {
        if (this.customProcessor) {
            FailoverChannelProcessor failoverChannelProcessor = new FailoverChannelProcessor(getChannelProcessor().getSelector());
            failoverChannelProcessor.configure(this.context);
            setChannelProcessor(failoverChannelProcessor);
            FailoverChannelProcessorHolder.setChannelProcessor(failoverChannelProcessor);
        }
        super.start();
        this.cachedChProcessor = getChannelProcessor();
        this.metricItemSet = new DataProxyMetricItemSet(CommonConfigHolder.getInstance().getClusterName(), this.cachedSrcName, String.valueOf(this.srcPort));
        MetricRegister.register(this.metricItemSet);
        if (this.enableFileMetric) {
            this.monitorIndex = new MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(), CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000, CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
            this.monitorIndex.start();
            this.monitorStats = new MonitorStats(CommonConfigHolder.getInstance().getFileMetricEventOutName() + AttrConstants.SEP_HASHTAG + this.cachedSrcName, CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000, CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
            this.monitorStats.start();
        }
        startSource();
        AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, this.cachedSrcName, this);
    }

    public synchronized void stop() {
        logger.info("[STOP {} SOURCE]{} stopping...", getProtocolName(), this.cachedSrcName);
        try {
        } catch (Exception e) {
            logger.warn("Close {} netty channels throw exception", this.cachedSrcName, e);
        } finally {
            this.allChannels.clear();
        }
        if (!this.allChannels.isEmpty()) {
            this.allChannels.close().awaitUninterruptibly();
        }
        if (this.channelFuture != null) {
            try {
                this.channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e2) {
                logger.warn("Close {} channel future throw exception", this.cachedSrcName, e2);
            }
        }
        super.stop();
        if (this.acceptorGroup != null) {
            this.acceptorGroup.shutdownGracefully();
        }
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
        if (this.enableFileMetric) {
            if (this.monitorIndex != null) {
                this.monitorIndex.stop();
            }
            if (this.monitorStats != null) {
                this.monitorStats.stop();
            }
        }
        logger.info("[STOP {} SOURCE]{} stopped", getProtocolName(), this.cachedSrcName);
    }

    @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
    public void update() {
        if (ConfigManager.getInstance().needChkIllegalIP()) {
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis();
            for (Channel channel : this.allChannels) {
                String channelRemoteIP = AddressUtils.getChannelRemoteIP(channel);
                if (channelRemoteIP != null && ConfigManager.getInstance().isIllegalIP(channelRemoteIP)) {
                    channel.disconnect();
                    channel.close();
                    this.allChannels.remove(channel);
                    i++;
                    logger.error(channelRemoteIP + " is Illegal IP, so disconnect it !");
                }
            }
            logger.info("Source {} channel check, disconnects {} Illegal channels, waist {} ms", new Object[]{this.cachedSrcName, Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        }
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    public Context getContext() {
        return this.context;
    }

    public String getSrcHost() {
        return this.srcHost;
    }

    public int getSrcPort() {
        return this.srcPort;
    }

    public String getStrPort() {
        return this.strPort;
    }

    public int getMaxMsgLength() {
        return this.maxMsgLength;
    }

    public boolean isCompressed() {
        return this.isCompressed;
    }

    public boolean isFilterEmptyMsg() {
        return this.filterEmptyMsg;
    }

    public boolean isCustomProcessor() {
        return this.customProcessor;
    }

    public int getMaxConnections() {
        return this.maxConnections;
    }

    public ChannelGroup getAllChannels() {
        return this.allChannels;
    }

    public long getMaxReadIdleTimeMs() {
        return this.maxReadIdleTimeMs;
    }

    public String getMessageHandlerName() {
        return this.messageHandlerName;
    }

    public int getMaxWorkerThreads() {
        return this.maxWorkerThreads;
    }

    public void fileMetricIncSumStats(String str) {
        if (this.enableFileMetric) {
            this.monitorStats.incSumStats(str);
        }
    }

    public void fileMetricIncWithDetailStats(String str, String str2) {
        if (this.enableFileMetric) {
            this.monitorStats.incSumStats(str);
            this.monitorStats.incDetailStats(str + AttrConstants.SEP_HASHTAG + str2);
        }
    }

    public void fileMetricAddSuccStats(StringBuilder sb, String str, String str2, String str3, String str4, String str5, long j, long j2, int i, int i2, long j3) {
        fileMetricIncStats(sb, true, str, str2, str3, str4, str5, j, j2, i, i2, j3, 0);
    }

    public void fileMetricAddFailStats(StringBuilder sb, String str, String str2, String str3, String str4, String str5, long j, long j2, int i) {
        fileMetricIncStats(sb, false, str, str2, str3, str4, str5, j, j2, 0, 0, 0L, i);
    }

    private void fileMetricIncStats(StringBuilder sb, boolean z, String str, String str2, String str3, String str4, String str5, long j, long j2, int i, int i2, long j3, int i3) {
        if (this.enableFileMetric) {
            sb.append(this.cachedSrcName).append(AttrConstants.SEP_HASHTAG).append(str).append(AttrConstants.SEP_HASHTAG).append(str2).append(AttrConstants.SEP_HASHTAG).append(str3).append(AttrConstants.SEP_HASHTAG).append(str5).append(AttrConstants.SEP_HASHTAG).append(this.srcHost).append(AttrConstants.SEP_HASHTAG).append(str4).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmmTenMins(j)).append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(j2));
            if (z) {
                this.monitorStats.incSumStats(StatConstants.EVENT_MSG_V0_POST_SUCCESS);
                this.monitorIndex.addSuccStats(sb.toString(), i, i2, j3);
            } else {
                this.monitorIndex.addFailStats(sb.toString(), i3);
                this.monitorStats.incSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
            }
            sb.delete(0, sb.length());
        }
    }

    public void addMetric(boolean z, long j, Event event) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", CommonConfigHolder.getInstance().getClusterName());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, this.cachedSrcName);
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, getStrPort());
        DataProxyMetricItem.fillInlongId(event, hashMap);
        DataProxyMetricItem.fillAuditFormatTime(event, hashMap);
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) this.metricItemSet.findMetricItem(hashMap);
        if (!z) {
            dataProxyMetricItem.readFailCount.incrementAndGet();
            dataProxyMetricItem.readFailSize.addAndGet(j);
        } else {
            dataProxyMetricItem.readSuccessCount.incrementAndGet();
            dataProxyMetricItem.readSuccessSize.addAndGet(j);
            AuditUtils.add(5, event);
        }
    }

    public ChannelInitializer getChannelInitializerFactory() {
        logger.info(this.cachedSrcName + " load msgFactory=" + this.msgFactoryName);
        try {
            Constructor<?> constructor = Class.forName(this.msgFactoryName).getConstructor(BaseSource.class);
            logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
            return (ChannelInitializer) constructor.newInstance(this);
        } catch (Exception e) {
            logger.error("{} start error, fail to construct ChannelPipelineFactory with name {}", new Object[]{this.cachedSrcName, this.msgFactoryName, e});
            stop();
            throw new FlumeException(e.getMessage());
        }
    }

    public abstract String getProtocolName();

    public abstract void startSource();

    @Override // org.apache.inlong.dataproxy.admin.ProxyServiceMBean
    public void stopService() {
        this.isRejectService = true;
    }

    @Override // org.apache.inlong.dataproxy.admin.ProxyServiceMBean
    public void recoverService() {
        this.isRejectService = false;
    }

    public boolean isRejectService() {
        return this.isRejectService;
    }

    public String getCachedSrcName() {
        return this.cachedSrcName;
    }

    public ChannelProcessor getCachedChProcessor() {
        return this.cachedChProcessor;
    }

    private String getHostIp(Context context) {
        String str = null;
        String string = context.getString("host");
        if (StringUtils.isNotBlank(string)) {
            String trim = string.trim();
            Preconditions.checkArgument(ConfStringUtils.isValidIp(trim), "host(" + trim + ") config in conf not valid");
            str = trim;
        }
        Map<String, String> map = System.getenv();
        if (map.containsKey(SourceConstants.SYSENV_HOST_IP)) {
            String str2 = map.get(SourceConstants.SYSENV_HOST_IP);
            Preconditions.checkArgument(ConfStringUtils.isValidIp(str2), "inlongHostIp(" + str2 + ") config in system env not valid");
            str = str2.trim();
        }
        if (StringUtils.isBlank(str)) {
            str = SourceConstants.VAL_DEF_HOST_VALUE;
        }
        return str;
    }

    private int getHostPort(Context context) {
        Integer num = null;
        String string = context.getString("port");
        if (StringUtils.isNotBlank(string)) {
            String trim = string.trim();
            try {
                num = Integer.valueOf(Integer.parseInt(trim));
            } catch (Throwable th) {
                throw new IllegalArgumentException("inlongHostPort(" + trim + ") config in conf not integer");
            }
        }
        if (num != null) {
            Preconditions.checkArgument(ConfStringUtils.isValidPort(num.intValue()), "port(" + num + ") config in conf not valid");
        }
        Map<String, String> map = System.getenv();
        if (map.containsKey(SourceConstants.SYSENV_HOST_PORT)) {
            String str = map.get(SourceConstants.SYSENV_HOST_PORT);
            if (StringUtils.isNotBlank(str)) {
                String trim2 = str.trim();
                try {
                    num = Integer.valueOf(Integer.parseInt(trim2));
                    Preconditions.checkArgument(ConfStringUtils.isValidPort(num.intValue()), "inlongHostPort(" + trim2 + ") config in system env not valid");
                } catch (Throwable th2) {
                    throw new IllegalArgumentException("inlongHostPort(" + trim2 + ") config in system env not integer");
                }
            }
        }
        if (num == null) {
            throw new IllegalArgumentException("Required parameter port must exist and may not be null");
        }
        return num.intValue();
    }
}
