package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import io.netty.channel.group.ChannelGroup;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Context;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/SourceContext.class */
public class SourceContext {
    public static final Logger LOG = LoggerFactory.getLogger(SourceContext.class);
    public static final String KEY_MAX_THREADS = "maxThreads";
    public static final String KEY_PROCESSINTERVAL = "processInterval";
    public static final String KEY_RELOADINTERVAL = "reloadInterval";
    public static final String CONNECTIONS = "connections";
    public static final String INLONG_HOST_IP = "inlongHostIp";
    public static final String INLONG_HOST_PORT = "inlongHostPort";
    protected AbstractSource source;
    protected ChannelGroup allChannels;
    protected String sourceId;
    protected String sourceDataId;
    protected String hostIp;
    protected int hostPort;
    protected int maxThreads;
    protected int maxConnections;
    protected int maxMsgLength;
    protected IdTopicConfigHolder idHolder;
    protected DataProxyMetricItemSet metricItemSet;
    protected Context parentContext;
    protected long reloadInterval;
    protected Timer reloadTimer;
    protected boolean isRejectService = false;
    protected String proxyClusterId = CommonPropertiesHolder.get().getOrDefault("proxy.cluster.name", "unknown");

    public SourceContext(AbstractSource abstractSource, ChannelGroup channelGroup, Context context) {
        this.maxThreads = 32;
        this.maxConnections = Integer.MAX_VALUE;
        this.source = abstractSource;
        this.allChannels = channelGroup;
        this.sourceId = abstractSource.getName();
        this.metricItemSet = new DataProxyMetricItemSet(this.sourceId);
        MetricRegister.register(this.metricItemSet);
        this.maxConnections = context.getInteger(CONNECTIONS, 5000).intValue();
        this.maxThreads = context.getInteger("maxThreads", 32).intValue();
        this.maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, 65536).intValue();
        Preconditions.checkArgument(this.maxMsgLength >= 4 && this.maxMsgLength <= 20971520, "maxMsgLength must be >= 4 and <= 20971520");
        this.hostIp = getHostIp(context);
        this.hostPort = getHostPort(context);
        this.sourceDataId = String.valueOf(this.hostPort);
        this.idHolder = new IdTopicConfigHolder();
        this.idHolder.configure(context);
        this.parentContext = context;
        this.reloadInterval = context.getLong("reloadInterval", Long.valueOf(DispatchManager.MINUTE_MS)).longValue();
    }

    private String getHostIp(Context context) {
        Map<String, String> map = System.getenv();
        return map.containsKey(INLONG_HOST_IP) ? map.get(INLONG_HOST_IP) : context.getString(INLONG_HOST_IP);
    }

    private int getHostPort(Context context) {
        Map<String, String> map = System.getenv();
        return map.containsKey(INLONG_HOST_PORT) ? NumberUtils.toInt(map.get(INLONG_HOST_PORT), 0) : context.getInteger(INLONG_HOST_IP, 0).intValue();
    }

    public void start() {
        try {
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void close() {
        try {
            this.reloadTimer.cancel();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    protected void setReloadTimer() {
        this.reloadTimer = new Timer(true);
        this.reloadTimer.schedule(new TimerTask() { // from class: org.apache.inlong.dataproxy.source.SourceContext.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                SourceContext.this.reload();
            }
        }, new Date(System.currentTimeMillis() + this.reloadInterval), this.reloadInterval);
    }

    public void reload() {
        try {
            this.idHolder.reload();
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public String getSourceDataId() {
        return this.sourceDataId;
    }

    public void setSourceDataId(String str) {
        this.sourceDataId = str;
    }

    public AbstractSource getSource() {
        return this.source;
    }

    public ChannelGroup getAllChannels() {
        return this.allChannels;
    }

    public String getProxyClusterId() {
        return this.proxyClusterId;
    }

    public String getSourceId() {
        return this.sourceId;
    }

    public int getMaxConnections() {
        return this.maxConnections;
    }

    public int getMaxMsgLength() {
        return this.maxMsgLength;
    }

    public IdTopicConfigHolder getIdHolder() {
        return this.idHolder;
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    public Context getParentContext() {
        return this.parentContext;
    }

    public long getReloadInterval() {
        return this.reloadInterval;
    }

    public int getMaxThreads() {
        return this.maxThreads;
    }

    public boolean isRejectService() {
        return this.isRejectService;
    }

    public void setRejectService(boolean z) {
        this.isRejectService = z;
    }
}
