package org.apache.inlong.dataproxy.http;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.lang.reflect.Constructor;
import java.util.Map;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.source.http.HTTPSource;
import org.apache.inlong.commons.monitor.CounterGroup;
import org.apache.inlong.commons.monitor.CounterGroupExt;
import org.apache.inlong.dataproxy.config.remote.ConfigMessageServlet;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.WaterMarkServlet;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/http/SimpleHttpSource.class */
public class SimpleHttpSource extends HttpBaseSource {
    private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
    public static final String POOL_SIZE = "poolSize";
    public static final String IDEL_TIME = "idelTime";
    public static final String BUFFER_SIZE = "bufferSize";
    public static final String BACKLOG = "backlog";
    private volatile Integer port;
    private volatile Server srv;
    private Map<String, String> subProps;
    private MessageHandler messageHandler;
    private volatile String keyStorePath;
    private volatile String keyStorePassword;
    private volatile Boolean sslEnabled;
    private int threadPoolSize = 512;
    private int maxIdelTime = 600000;
    private int requestBufferSize = 10000;
    private int backlog = 2048;

    @Override // org.apache.inlong.dataproxy.http.HttpBaseSource
    public void configure(org.apache.flume.Context context) {
        super.configure(context);
        try {
            this.port = context.getInteger(ConfigConstants.CONFIG_PORT);
            this.threadPoolSize = context.getInteger(POOL_SIZE, 512).intValue();
            this.maxIdelTime = context.getInteger(IDEL_TIME, 600000).intValue();
            this.requestBufferSize = context.getInteger(BUFFER_SIZE, 10000).intValue();
            this.backlog = context.getInteger(BACKLOG, 2048).intValue();
            LOG.info("http backlog set to {}", Integer.valueOf(this.backlog));
            checkPort();
            this.sslEnabled = context.getBoolean("enableSSL", false);
            if (this.sslEnabled.booleanValue()) {
                LOG.debug("SSL configuration enabled");
                this.keyStorePath = context.getString("keystore");
                Preconditions.checkArgument((this.keyStorePath == null || this.keyStorePath.isEmpty()) ? false : true, "Keystore is required for SSL Conifguration");
                this.keyStorePassword = context.getString("keystorePassword");
                Preconditions.checkArgument(this.keyStorePassword != null, "Keystore password is required for SSL Configuration");
            }
            this.subProps = context.getSubProperties("handler.");
        } catch (Exception e) {
            LOG.error("Error configuring HTTPSource!", e);
            Throwables.propagate(e);
        }
    }

    @Override // org.apache.inlong.dataproxy.http.HttpBaseSource
    public synchronized void stop() {
        super.stop();
    }

    @Override // org.apache.inlong.dataproxy.http.HttpBaseSource
    public synchronized void start() {
        super.start();
        try {
            Constructor<?> constructor = Class.forName(this.messageHandlerName).getConstructor(ChannelProcessor.class, CounterGroup.class, CounterGroupExt.class, ServiceDecoder.class);
            LOG.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
            this.messageHandler = (MessageHandler) constructor.newInstance(getChannelProcessor(), this.counterGroup, this.counterGroupExt, null);
            this.messageHandler.configure(new org.apache.flume.Context(this.subProps));
            this.srv = new Server();
            Connector[] connectorArr = new Connector[1];
            if (this.sslEnabled.booleanValue()) {
                SslSocketConnector sslSocketConnector = new SslSocketConnector();
                sslSocketConnector.setKeystore(this.keyStorePath);
                sslSocketConnector.setKeyPassword(this.keyStorePassword);
                sslSocketConnector.setReuseAddress(true);
                connectorArr[0] = sslSocketConnector;
                LOG.info("sslEnabled {}", this.sslEnabled);
            } else {
                SelectChannelConnector selectChannelConnector = new SelectChannelConnector();
                selectChannelConnector.setReuseAddress(true);
                selectChannelConnector.setThreadPool(new QueuedThreadPool(this.threadPoolSize));
                selectChannelConnector.setMaxIdleTime(this.maxIdelTime);
                selectChannelConnector.setRequestBufferSize(this.requestBufferSize);
                selectChannelConnector.setAcceptQueueSize(this.backlog);
                LOG.info("set config maxIdelTime {}, backlog {}", Integer.valueOf(this.maxIdelTime), Integer.valueOf(this.backlog));
                connectorArr[0] = selectChannelConnector;
            }
            connectorArr[0].setHost(this.host);
            connectorArr[0].setPort(this.port.intValue());
            this.srv.setConnectors(connectorArr);
            org.mortbay.jetty.servlet.Context context = new org.mortbay.jetty.servlet.Context(this.srv, "/", 1);
            context.setMaxFormContentSize(this.maxMsgLength);
            context.addFilter(new FilterHolder(new MessageFilter(this.maxMsgLength)), "/dataproxy/*", 1);
            context.addServlet(new ServletHolder(new MessageProcessServlet(this.messageHandler)), "/dataproxy/*");
            context.addServlet(new ServletHolder(new ConfigMessageServlet()), "/dataproxy/config/*");
            context.addServlet(new ServletHolder(new WaterMarkServlet()), "/dataproxy/tcp/watermark/*");
            this.srv.start();
            Preconditions.checkArgument(this.srv.getHandler().equals(context));
        } catch (ClassCastException e) {
            LOG.error("Deserializer is not an instance of HTTPSourceHandler.Deserializer must implement HTTPSourceHandler.");
            Throwables.propagate(e);
        } catch (ClassNotFoundException e2) {
            LOG.error("Error while configuring HTTPSource. Exception follows.", e2);
            Throwables.propagate(e2);
        } catch (Exception e3) {
            LOG.error("Error while starting HTTPSource. Exception follows.", e3);
            Throwables.propagate(e3);
        }
        Preconditions.checkArgument(this.srv.isRunning());
    }

    private void checkPort() {
        Preconditions.checkNotNull(this.port, "HTTPSource requires a port number to bespecified");
    }
}
