package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.Executors;
import org.apache.commons.io.IOUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleTcpSource.class */
public class SimpleTcpSource extends BaseSource implements Configurable, EventDrivenSource {
    private static final String blacklistFilePath = "blacklist.properties";
    private static long propsLastModified;
    private CheckBlackListThread checkBlackListThread;
    private int maxThreads = 32;
    private boolean tcpNoDelay = true;
    private boolean keepAlive = true;
    private int receiveBufferSize;
    private int highWaterMark;
    private int sendBufferSize;
    private int trafficClass;
    protected String topic;
    private DataProxyMetricItemSet metricItemSet;
    private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
    public static ArrayList<String> blacklist = new ArrayList<>();
    private static int TRAFFIC_CLASS_TYPE_0 = 0;
    private static int TRAFFIC_CLASS_TYPE_96 = 96;
    private static int BUFFER_SIZE_MUST_THAN = 0;
    private static int HIGH_WATER_MARK_DEFAULT_VALUE = 65536;
    private static int RECEIVE_BUFFER_DEFAULT_SIZE = 65536;
    private static int SEND_BUFFER_DEFAULT_SIZE = 65536;
    private static int RECEIVE_BUFFER_MAX_SIZE = 16777216;
    private static int SEND_BUFFER_MAX_SIZE = 16777216;
    private static int DEFAULT_SLEEP_TIME_MS = 5000;

    /* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleTcpSource$CheckBlackListThread.class */
    private class CheckBlackListThread extends Thread {
        private boolean shutdown;

        private CheckBlackListThread() {
            this.shutdown = false;
        }

        public void shutdouwn() {
            this.shutdown = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            SimpleTcpSource.logger.info("CheckBlackListThread thread {} start.", Thread.currentThread().getName());
            while (!this.shutdown) {
                try {
                    File file = new File("conf/blacklist.properties");
                    if (file.lastModified() > SimpleTcpSource.propsLastModified) {
                        SimpleTcpSource.blacklist = SimpleTcpSource.this.load(SimpleTcpSource.blacklistFilePath);
                        long unused = SimpleTcpSource.propsLastModified = file.lastModified();
                        SimpleTcpSource.logger.info("blacklist.properties:{}\n{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(file.lastModified())), SimpleTcpSource.blacklist);
                    }
                    Thread.sleep(SimpleTcpSource.DEFAULT_SLEEP_TIME_MS);
                    SimpleTcpSource.checkBlackList(SimpleTcpSource.blacklist, SimpleTcpSource.this.allChannels);
                } catch (InterruptedException e) {
                    SimpleTcpSource.logger.info("ConfigReloader thread exit!");
                    return;
                } catch (Throwable th) {
                    SimpleTcpSource.logger.error("ConfigReloader thread error!", th);
                }
            }
        }
    }

    public SimpleTcpSource() {
        this.allChannels = new DefaultChannelGroup();
    }

    public static void checkBlackList(ArrayList arrayList, ChannelGroup channelGroup) {
        if (arrayList != null) {
            Iterator it = channelGroup.iterator();
            while (it.hasNext()) {
                Channel channel = (Channel) it.next();
                String str = null;
                SocketAddress remoteAddress = channel.getRemoteAddress();
                if (null != remoteAddress) {
                    str = remoteAddress.toString();
                    try {
                        str = str.substring(1, str.indexOf(58));
                    } catch (Exception e) {
                        logger.warn("fail to get the remote IP, and strIP={},remoteSocketAddress={}", str, remoteAddress);
                    }
                }
                if (str != null && arrayList.contains(str)) {
                    logger.error(str + " is in blacklist, so disconnect it !");
                    channel.disconnect();
                    channel.close();
                    channelGroup.remove(channel);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ArrayList<String> load(String str) {
        ArrayList<String> arrayList = new ArrayList<>();
        if (str == null) {
            logger.error("fail to loadProperties, filename is null");
            return arrayList;
        }
        FileReader fileReader = null;
        BufferedReader bufferedReader = null;
        try {
            try {
                fileReader = new FileReader("conf/" + str);
                bufferedReader = new BufferedReader(fileReader);
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    arrayList.add(readLine);
                }
                IOUtils.closeQuietly(fileReader);
                IOUtils.closeQuietly(bufferedReader);
            } catch (UnsupportedEncodingException e) {
                logger.error("fail to loadPropery, file ={}, and e= {}", str, e);
                IOUtils.closeQuietly(fileReader);
                IOUtils.closeQuietly(bufferedReader);
            } catch (Exception e2) {
                logger.error("fail to loadProperty, file ={}, and e= {}", str, e2);
                IOUtils.closeQuietly(fileReader);
                IOUtils.closeQuietly(bufferedReader);
            }
            return arrayList;
        } catch (Throwable th) {
            IOUtils.closeQuietly(fileReader);
            IOUtils.closeQuietly(bufferedReader);
            throw th;
        }
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void startSource() {
        logger.info("start " + getName());
        this.metricItemSet = new DataProxyMetricItemSet(getName());
        MetricRegister.register(this.metricItemSet);
        this.checkBlackListThread = new CheckBlackListThread();
        this.checkBlackListThread.start();
        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
        NioServerSocketChannelFactory nioServerSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("tcpSource-nettyBoss-threadGroup")), 1, Executors.newCachedThreadPool(new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")), this.maxThreads);
        logger.info("Set max workers : {} ;", Integer.valueOf(this.maxThreads));
        this.serverBootstrap = new ServerBootstrap(nioServerSocketChannelFactory);
        this.serverBootstrap.setOption("child.tcpNoDelay", Boolean.valueOf(this.tcpNoDelay));
        this.serverBootstrap.setOption("child.keepAlive", Boolean.valueOf(this.keepAlive));
        this.serverBootstrap.setOption("child.receiveBufferSize", Integer.valueOf(this.receiveBufferSize));
        this.serverBootstrap.setOption("child.sendBufferSize", Integer.valueOf(this.sendBufferSize));
        this.serverBootstrap.setOption("child.trafficClass", Integer.valueOf(this.trafficClass));
        this.serverBootstrap.setOption("child.writeBufferHighWaterMark", Integer.valueOf(this.highWaterMark));
        logger.info("load msgFactory=" + this.msgFactoryName + " and serviceDecoderName=" + this.serviceDecoderName);
        this.serverBootstrap.setPipelineFactory(getChannelPiplineFactory());
        try {
            if (this.host == null) {
                this.nettyChannel = this.serverBootstrap.bind(new InetSocketAddress(this.port));
            } else {
                this.nettyChannel = this.serverBootstrap.bind(new InetSocketAddress(this.host, this.port));
            }
        } catch (Exception e) {
            logger.error("Simple TCP Source error bind host {} port {},program will exit!", this.host, Integer.valueOf(this.port));
            System.exit(-1);
        }
        this.allChannels.add(this.nettyChannel);
        logger.info("Simple TCP Source started at host {}, port {}", this.host, Integer.valueOf(this.port));
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void stop() {
        this.checkBlackListThread.shutdouwn();
        super.stop();
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public void configure(Context context) {
        logger.info("context is {}", context);
        super.configure(context);
        this.tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true).booleanValue();
        this.keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true).booleanValue();
        this.highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, Integer.valueOf(HIGH_WATER_MARK_DEFAULT_VALUE)).intValue();
        this.receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, Integer.valueOf(RECEIVE_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
            this.receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.receiveBufferSize > BUFFER_SIZE_MUST_THAN, "receiveBufferSize must be > 0");
        this.sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, Integer.valueOf(SEND_BUFFER_DEFAULT_SIZE)).intValue();
        if (this.sendBufferSize > SEND_BUFFER_MAX_SIZE) {
            this.sendBufferSize = SEND_BUFFER_MAX_SIZE;
        }
        Preconditions.checkArgument(this.sendBufferSize > BUFFER_SIZE_MUST_THAN, "sendBufferSize must be > 0");
        this.trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, Integer.valueOf(TRAFFIC_CLASS_TYPE_0)).intValue();
        Preconditions.checkArgument(this.trafficClass == TRAFFIC_CLASS_TYPE_0 || this.trafficClass == TRAFFIC_CLASS_TYPE_96, "trafficClass must be == 0 or == 96");
        try {
            this.maxThreads = context.getInteger("max-threads", 32).intValue();
        } catch (NumberFormatException e) {
            logger.warn("Simple TCP Source max-threads property must specify an integer value. {}", context.getString("max-threads"));
        }
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public String getProtocolName() {
        return ConfigConstants.TCP_PROTOCOL;
    }
}
