package org.apache.rocketmq.streams.http.source.server;

import com.alibaba.fastjson.JSONObject;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsParameters;
import com.sun.net.httpserver.HttpsServer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.http.source.HttpSource;
import org.apache.rocketmq.streams.http.source.util.HttpUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/http/source/server/HttpServer.class */
public class HttpServer extends AbstractUnreliableSource {
    private static final Log LOG = LogFactory.getLog(HttpServer.class);
    private transient com.sun.net.httpserver.HttpServer server;

    @ENVDependence
    private int stopDelaySecond;

    @ENVDependence
    private int port = 8000;

    @ENVDependence
    private String serverIp = "localhost";

    @ENVDependence
    private int backlog = 10;
    private boolean useHttps = false;
    protected transient List<HttpSource> channels = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/http/source/server/HttpServer$DataHandler.class */
    public class DataHandler implements HttpHandler {
        private DataHandler() {
        }

        public void handle(HttpExchange httpExchange) throws IOException {
            OutputStream responseBody = httpExchange.getResponseBody();
            InputStream requestBody = httpExchange.getRequestBody();
            try {
                try {
                    JSONObject createHttpHeader = HttpServer.this.createHttpHeader(httpExchange);
                    createHttpHeader.put("query", httpExchange.getRequestURI().getQuery());
                    createHttpHeader.put("data", HttpServer.this.getContent(requestBody));
                    HttpServer.this.doReceiveMessage(createHttpHeader);
                    httpExchange.sendResponseHeaders(HttpUtil.NORMAL_STATUES, 0L);
                    responseBody.write("{\"code\": \"200\", \"data\" :\"received\", \"message\" :\"\"}".getBytes());
                    responseBody.flush();
                    if (responseBody != null) {
                        responseBody.close();
                    }
                    if (requestBody != null) {
                        requestBody.close();
                    }
                } catch (Exception e) {
                    HttpServer.LOG.error("");
                    if (responseBody != null) {
                        responseBody.close();
                    }
                    if (requestBody != null) {
                        requestBody.close();
                    }
                }
            } catch (Throwable th) {
                if (responseBody != null) {
                    responseBody.close();
                }
                if (requestBody != null) {
                    requestBody.close();
                }
                throw th;
            }
        }
    }

    public HttpServer() {
        setJsonData(false);
        setMsgIsJsonArray(false);
    }

    public void register(HttpSource httpSource) {
        this.channels.add(httpSource);
    }

    protected boolean initConfigurable() {
        try {
            if (this.useHttps) {
                HttpsServer create = HttpsServer.create(new InetSocketAddress(this.serverIp, this.port), this.backlog);
                SSLContext sSLContext = SSLContext.getInstance("TLS");
                char[] charArray = "password".toCharArray();
                KeyStore keyStore = KeyStore.getInstance("JKS");
                keyStore.load(getClass().getResourceAsStream("/testkey.jks"), charArray);
                KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
                keyManagerFactory.init(keyStore, charArray);
                TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
                trustManagerFactory.init(keyStore);
                sSLContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
                create.setHttpsConfigurator(new HttpsConfigurator(sSLContext) { // from class: org.apache.rocketmq.streams.http.source.server.HttpServer.1
                    public void configure(HttpsParameters httpsParameters) {
                        try {
                            SSLContext sSLContext2 = getSSLContext();
                            SSLEngine createSSLEngine = sSLContext2.createSSLEngine();
                            httpsParameters.setNeedClientAuth(false);
                            httpsParameters.setCipherSuites(createSSLEngine.getEnabledCipherSuites());
                            httpsParameters.setProtocols(createSSLEngine.getEnabledProtocols());
                            httpsParameters.setSSLParameters(sSLContext2.getSupportedSSLParameters());
                        } catch (Exception e) {
                            HttpServer.LOG.error("Failed to create HTTPS port", e);
                        }
                    }
                });
                this.server = create;
            } else {
                this.server = com.sun.net.httpserver.HttpServer.create(new InetSocketAddress(this.serverIp, this.port), this.backlog);
            }
            return true;
        } catch (IOException e) {
            LOG.error("http channel init get io exception", e);
            return false;
        } catch (NoSuchAlgorithmException e2) {
            LOG.error("http channel init https ssl context exception", e2);
            return false;
        } catch (Exception e3) {
            LOG.error("http channel init http cert exception", e3);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean startSource() {
        this.server.setExecutor(new ThreadPoolExecutor(this.maxThread, this.maxThread, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1000)));
        this.server.createContext("/", new DataHandler());
        setReceiver((iMessage, abstractContext) -> {
            String string = iMessage.getMessageBody().getString("clientIp");
            String string2 = iMessage.getMessageBody().getString("query");
            String string3 = iMessage.getMessageBody().getString("uri");
            ArrayList arrayList = new ArrayList();
            for (HttpSource httpSource : this.channels) {
                if (httpSource.match(string, string2, string3)) {
                    if (httpSource.isDestroy()) {
                        arrayList.add(httpSource);
                    } else {
                        httpSource.doReceiveMessage(iMessage.getMessageBody());
                    }
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.channels.remove((HttpSource) it.next());
            }
            return iMessage;
        });
        this.server.start();
        if (isUseHttps()) {
            LOG.info("https server is start");
            return true;
        }
        LOG.info("http server is start");
        return true;
    }

    public boolean isUseHttps() {
        return this.useHttps;
    }

    public void setUseHttps(boolean z) {
        this.useHttps = z;
    }

    public String getServerIp() {
        return this.serverIp;
    }

    public void setServerIp(String str) {
        this.serverIp = str;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public int getBacklog() {
        return this.backlog;
    }

    public void setBacklog(int i) {
        this.backlog = i;
    }

    public int getStopDelaySecond() {
        return this.stopDelaySecond;
    }

    public void setStopDelaySecond(int i) {
        this.stopDelaySecond = i;
    }

    protected JSONObject createHttpHeader(HttpExchange httpExchange) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("clientIp", httpExchange.getRemoteAddress().getAddress().getHostAddress());
        jSONObject.put("uri", httpExchange.getRequestURI().getPath());
        jSONObject.put("User-agent", httpExchange.getRequestHeaders().getFirst("User-agent"));
        jSONObject.put("Content-type", httpExchange.getRequestHeaders().getFirst("Content-type"));
        jSONObject.put("Host", httpExchange.getRequestHeaders().getFirst("Host"));
        jSONObject.put("method", httpExchange.getRequestMethod());
        return jSONObject;
    }

    protected String getContent(InputStream inputStream) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        StringBuilder sb = new StringBuilder();
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return sb.toString();
            }
            sb.append(readLine);
        }
    }
}
