/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.socket.client;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.EncodableHistogram;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.proxy.socket.client.SimpleTestProducerSocket;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PerformanceClient {
    static AtomicInteger msgSent = new AtomicInteger(0);
    private static final LongAdder messagesSent = new LongAdder();
    private static final LongAdder bytesSent = new LongAdder();
    private static final LongAdder totalMessagesSent = new LongAdder();
    private static final LongAdder totalBytesSent = new LongAdder();
    private JCommander jc;
    static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
    static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
    private static final Logger log = LoggerFactory.getLogger(PerformanceClient.class);

    public Arguments loadArguments(String[] args) {
        Arguments arguments = new Arguments();
        this.jc = new JCommander((Object)arguments);
        this.jc.setProgramName("pulsar-perf websocket-producer");
        try {
            this.jc.parse(args);
        }
        catch (ParameterException e) {
            System.out.println(e.getMessage());
            this.jc.usage();
            PerfClientUtils.exit(-1);
        }
        if (arguments.help) {
            this.jc.usage();
            PerfClientUtils.exit(-1);
        }
        if (arguments.topics.size() != 1) {
            System.err.println("Only one topic name is allowed");
            this.jc.usage();
            PerfClientUtils.exit(-1);
        }
        if (arguments.confFile != null) {
            Properties prop = new Properties(System.getProperties());
            try {
                prop.load(new FileInputStream(arguments.confFile));
            }
            catch (IOException e) {
                log.error("Error in loading config file");
                this.jc.usage();
                PerfClientUtils.exit(1);
            }
            if (StringUtils.isBlank((CharSequence)arguments.proxyURL)) {
                String webSocketServiceUrl = prop.getProperty("webSocketServiceUrl");
                if (StringUtils.isNotBlank((CharSequence)webSocketServiceUrl)) {
                    arguments.proxyURL = webSocketServiceUrl;
                } else {
                    String webServiceUrl;
                    String string = webServiceUrl = StringUtils.isNotBlank((CharSequence)prop.getProperty("webServiceUrl")) ? prop.getProperty("webServiceUrl") : prop.getProperty("serviceUrl");
                    if (StringUtils.isNotBlank((CharSequence)webServiceUrl)) {
                        if (webServiceUrl.startsWith("ws://") || webServiceUrl.startsWith("wss://")) {
                            arguments.proxyURL = webServiceUrl;
                        } else if (webServiceUrl.startsWith("http://") || webServiceUrl.startsWith("https://")) {
                            arguments.proxyURL = webServiceUrl.replaceFirst("^http", "ws");
                        }
                    }
                }
            }
            if (arguments.authPluginClassName == null) {
                arguments.authPluginClassName = prop.getProperty("authPlugin", null);
            }
            if (arguments.authParams == null) {
                arguments.authParams = prop.getProperty("authParams", null);
            }
        }
        if (StringUtils.isBlank((CharSequence)arguments.proxyURL)) {
            arguments.proxyURL = "ws://localhost:8080/";
        }
        if (!arguments.proxyURL.endsWith("/")) {
            arguments.proxyURL = arguments.proxyURL + "/";
        }
        arguments.testTime = TimeUnit.SECONDS.toMillis(arguments.testTime);
        return arguments;
    }

    public void runPerformanceTest(long messages, long limit, int numOfTopic, int sizeOfMessage, String baseUrl, String topicName, String authPluginClassName, String authParams) throws InterruptedException, FileNotFoundException {
        ExecutorService executor = Executors.newCachedThreadPool((ThreadFactory)new DefaultThreadFactory("pulsar-perf-producer-exec"));
        HashMap<String, Tuple> producersMap = new HashMap<String, Tuple>();
        String restPath = TopicName.get((String)topicName).getRestPath();
        String produceBaseEndPoint = TopicName.get((String)topicName).isV2() ? baseUrl + "ws/v2/producer/" + restPath : baseUrl + "ws/producer/" + restPath;
        for (int i = 0; i < numOfTopic; ++i) {
            String topic = numOfTopic > 1 ? produceBaseEndPoint + String.valueOf(i) : produceBaseEndPoint;
            URI produceUri = URI.create(topic);
            WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            if (StringUtils.isNotBlank((CharSequence)authPluginClassName) && StringUtils.isNotBlank((CharSequence)authParams)) {
                try {
                    Authentication auth = AuthenticationFactory.create((String)authPluginClassName, (String)authParams);
                    auth.start();
                    AuthenticationDataProvider authData = auth.getAuthData();
                    if (authData.hasDataForHttp()) {
                        for (Map.Entry kv : authData.getHttpHeaders()) {
                            produceRequest.setHeader((String)kv.getKey(), (String)kv.getValue());
                        }
                    }
                }
                catch (Exception e) {
                    log.error("Authentication plugin error: " + e.getMessage());
                }
            }
            SimpleTestProducerSocket produceSocket = new SimpleTestProducerSocket();
            try {
                produceClient.start();
                produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            }
            catch (IOException e1) {
                log.error("Fail in connecting: [{}]", (Object)e1.getMessage());
                return;
            }
            catch (Exception e1) {
                log.error("Fail in starting client[{}]", (Object)e1.getMessage());
                return;
            }
            producersMap.put(produceUri.toString(), new Tuple(produceClient, produceRequest, produceSocket));
        }
        TimeUnit.SECONDS.sleep(5L);
        executor.submit(() -> {
            try {
                RateLimiter rateLimiter = RateLimiter.create((double)limit);
                long totalSent = 0L;
                block2: while (true) {
                    Iterator iterator = producersMap.keySet().iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block2;
                        String topic = (String)iterator.next();
                        if (messages > 0L && totalSent >= messages) {
                            log.trace("------------- DONE (reached the maximum number: [{}] of production) --------------", (Object)messages);
                            Thread.sleep(10000L);
                            PerfClientUtils.exit(0);
                        }
                        rateLimiter.acquire();
                        if (((Tuple)producersMap.get(topic)).getSocket().getSession() == null) {
                            Thread.sleep(10000L);
                            PerfClientUtils.exit(0);
                        }
                        ((Tuple)producersMap.get(topic)).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage);
                        messagesSent.increment();
                        bytesSent.add(sizeOfMessage);
                        totalMessagesSent.increment();
                        totalBytesSent.add(sizeOfMessage);
                    }
                    break;
                }
            }
            catch (Throwable t) {
                log.error(t.getMessage());
                PerfClientUtils.exit(0);
                return;
            }
        });
        long oldTime = System.nanoTime();
        Histogram reportHistogram = null;
        String statsFileName = "perf-websocket-producer-" + System.currentTimeMillis() + ".hgrm";
        log.info("Dumping latency stats to {} \n", (Object)statsFileName);
        PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
        HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog);
        histogramLogWriter.outputLogFormatVersion();
        histogramLogWriter.outputLegend();
        while (true) {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                break;
            }
            long now = System.nanoTime();
            double elapsed = (double)(now - oldTime) / 1.0E9;
            double rate = (double)messagesSent.sumThenReset() / elapsed;
            double throughput = (double)bytesSent.sumThenReset() / elapsed / 1024.0 / 1024.0 * 8.0;
            reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram(reportHistogram);
            log.info("Throughput produced: {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms", new Object[]{throughputFormat.format(rate), throughputFormat.format(throughput), dec.format(reportHistogram.getMean() / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(50.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(95.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.9) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.99) / 1000.0)});
            histogramLogWriter.outputIntervalHistogram((EncodableHistogram)reportHistogram);
            reportHistogram.reset();
            oldTime = now;
        }
        TimeUnit.SECONDS.sleep(100L);
        executor.shutdown();
    }

    public static void main(String[] args) throws Exception {
        PerformanceClient test = new PerformanceClient();
        Arguments arguments = test.loadArguments(args);
        PerfClientUtils.printJVMInformation(log);
        long start = System.nanoTime();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            PerformanceClient.printAggregatedThroughput(start);
            PerformanceClient.printAggregatedStats();
        }));
        test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize, arguments.proxyURL, arguments.topics.get(0), arguments.authPluginClassName, arguments.authParams);
    }

    private static void printAggregatedThroughput(long start) {
        double elapsed = (double)(System.nanoTime() - start) / 1.0E9;
        double rate = (double)totalMessagesSent.sum() / elapsed;
        double throughput = (double)totalBytesSent.sum() / elapsed / 1024.0 / 1024.0 * 8.0;
        log.info("Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s", new Object[]{totalMessagesSent, totalFormat.format(rate), totalFormat.format(throughput)});
    }

    private static void printAggregatedStats() {
        Histogram reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram();
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{dec.format(reportHistogram.getMean()), reportHistogram.getValueAtPercentile(50.0), reportHistogram.getValueAtPercentile(95.0), reportHistogram.getValueAtPercentile(99.0), reportHistogram.getValueAtPercentile(99.9), reportHistogram.getValueAtPercentile(99.99), reportHistogram.getValueAtPercentile(99.999), reportHistogram.getMaxValue()});
    }

    private class Tuple {
        private WebSocketClient produceClient;
        private ClientUpgradeRequest produceRequest;
        private SimpleTestProducerSocket produceSocket;

        public Tuple(WebSocketClient produceClient, ClientUpgradeRequest produceRequest, SimpleTestProducerSocket produceSocket) {
            this.produceClient = produceClient;
            this.produceRequest = produceRequest;
            this.produceSocket = produceSocket;
        }

        public SimpleTestProducerSocket getSocket() {
            return this.produceSocket;
        }
    }

    @Parameters(commandDescription="Test pulsar websocket producer performance.")
    static class Arguments {
        @Parameter(names={"-h", "--help"}, description="Help message", help=true)
        boolean help;
        @Parameter(names={"--conf-file"}, description="Configuration file")
        public String confFile;
        @Parameter(names={"-u", "--proxy-url"}, description="Pulsar Proxy URL, e.g., \"ws://localhost:8080/\"")
        public String proxyURL;
        @Parameter(description="persistent://tenant/ns/my-topic", required=true)
        public List<String> topics;
        @Parameter(names={"-r", "--rate"}, description="Publish rate msg/s across topics")
        public int msgRate = 100;
        @Parameter(names={"-s", "--size"}, description="Message size in byte")
        public int msgSize = 1024;
        @Parameter(names={"-t", "--num-topic"}, description="Number of topics")
        public int numTopics = 1;
        @Parameter(names={"--auth_plugin"}, description="Authentication plugin class name")
        public String authPluginClassName;
        @Parameter(names={"--auth-params"}, description="Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
        public String authParams;
        @Parameter(names={"-m", "--num-messages"}, description="Number of messages to publish in total. If 0, it will keep publishing")
        public long numMessages = 0L;
        @Parameter(names={"-f", "--payload-file"}, description="Use payload from a file instead of empty buffer")
        public String payloadFilename = null;
        @Parameter(names={"-time", "--test-duration"}, description="Test duration in secs. If 0, it will keep publishing")
        public long testTime = 0L;

        Arguments() {
        }
    }
}

