/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.EncodableHistogram;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PerformanceProducer {
    private static final ExecutorService executor = Executors.newCachedThreadPool((ThreadFactory)new DefaultThreadFactory("pulsar-perf-producer-exec"));
    private static final LongAdder messagesSent = new LongAdder();
    private static final LongAdder messagesFailed = new LongAdder();
    private static final LongAdder bytesSent = new LongAdder();
    private static final LongAdder totalMessagesSent = new LongAdder();
    private static final LongAdder totalBytesSent = new LongAdder();
    private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMicros(120000L), 5);
    private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000L), 5);
    static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
    static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
    private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);

    public static void main(String[] args) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jc = new JCommander((Object)arguments);
        jc.setProgramName("pulsar-perf produce");
        try {
            jc.parse(args);
        }
        catch (ParameterException e) {
            System.out.println(e.getMessage());
            jc.usage();
            System.exit(-1);
        }
        if (arguments.help) {
            jc.usage();
            System.exit(-1);
        }
        if (arguments.topics.size() != 1) {
            System.out.println("Only one topic name is allowed");
            jc.usage();
            System.exit(-1);
        }
        if (arguments.confFile != null) {
            Properties prop = new Properties(System.getProperties());
            prop.load(new FileInputStream(arguments.confFile));
            if (arguments.serviceURL == null) {
                arguments.serviceURL = prop.getProperty("brokerServiceUrl");
            }
            if (arguments.serviceURL == null) {
                arguments.serviceURL = prop.getProperty("webServiceUrl");
            }
            if (arguments.serviceURL == null) {
                arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/");
            }
            if (arguments.authPluginClassName == null) {
                arguments.authPluginClassName = prop.getProperty("authPlugin", null);
            }
            if (arguments.authParams == null) {
                arguments.authParams = prop.getProperty("authParams", null);
            }
            if (StringUtils.isBlank((CharSequence)arguments.tlsTrustCertsFilePath)) {
                arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", "");
            }
            if (StringUtils.isBlank((CharSequence)arguments.messageKeyGenerationMode)) {
                arguments.messageKeyGenerationMode = prop.getProperty("messageKeyGenerationMode", null);
            }
            if (arguments.tlsAllowInsecureConnection == null) {
                arguments.tlsAllowInsecureConnection = Boolean.parseBoolean(prop.getProperty("tlsAllowInsecureConnection", ""));
            }
        }
        PerfClientUtils.printJVMInformation(log);
        ObjectMapper m = new ObjectMapper();
        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
        log.info("Starting Pulsar perf producer with config: {}", (Object)w.writeValueAsString((Object)arguments));
        byte[] payloadBytes = new byte[arguments.msgSize];
        Random random = new Random(0L);
        ArrayList payloadByteList = Lists.newArrayList();
        if (arguments.payloadFilename != null) {
            Path payloadFilePath = Paths.get(arguments.payloadFilename, new String[0]);
            if (Files.notExists(payloadFilePath, new LinkOption[0]) || Files.size(payloadFilePath) == 0L) {
                throw new IllegalArgumentException("Payload file doesn't exist or it is empty.");
            }
            String delimiter = arguments.payloadDelimiter.equals("\\n") ? "\n" : arguments.payloadDelimiter;
            String[] payloadList = new String(Files.readAllBytes(payloadFilePath), StandardCharsets.UTF_8).split(delimiter);
            log.info("Reading payloads from {} and {} records read", (Object)payloadFilePath.toAbsolutePath(), (Object)payloadList.length);
            for (String payload : payloadList) {
                payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
            }
        } else {
            for (int i = 0; i < payloadBytes.length; ++i) {
                payloadBytes[i] = (byte)(random.nextInt(26) + 65);
            }
        }
        long start = System.nanoTime();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            PerformanceProducer.printAggregatedThroughput(start);
            PerformanceProducer.printAggregatedStats();
        }));
        CountDownLatch doneLatch = new CountDownLatch(arguments.numTestThreads);
        long numMessagesPerThread = arguments.numMessages / (long)arguments.numTestThreads;
        int msgRatePerThread = arguments.msgRate / arguments.numTestThreads;
        int i = 0;
        while (i < arguments.numTestThreads) {
            int threadIdx = i++;
            executor.submit(() -> {
                log.info("Started performance test thread {}", (Object)threadIdx);
                PerformanceProducer.runProducer(arguments, numMessagesPerThread, msgRatePerThread, payloadByteList, payloadBytes, random, doneLatch);
            });
        }
        long oldTime = System.nanoTime();
        Histogram reportHistogram = null;
        String statsFileName = "perf-producer-" + System.currentTimeMillis() + ".hgrm";
        log.info("Dumping latency stats to {}", (Object)statsFileName);
        PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
        HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog);
        histogramLogWriter.outputLogFormatVersion();
        histogramLogWriter.outputLegend();
        while (true) {
            try {
                Thread.sleep(10000L);
            }
            catch (InterruptedException e) {
                break;
            }
            if (doneLatch.getCount() <= 0L) break;
            long now = System.nanoTime();
            double elapsed = (double)(now - oldTime) / 1.0E9;
            double rate = (double)messagesSent.sumThenReset() / elapsed;
            double failureRate = (double)messagesFailed.sumThenReset() / elapsed;
            double throughput = (double)bytesSent.sumThenReset() / elapsed / 1024.0 / 1024.0 * 8.0;
            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
            log.info("Throughput produced: {}  msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", new Object[]{throughputFormat.format(rate), throughputFormat.format(throughput), throughputFormat.format(failureRate), dec.format(reportHistogram.getMean() / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(50.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(95.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.9) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.99) / 1000.0), dec.format((double)reportHistogram.getMaxValue() / 1000.0)});
            histogramLogWriter.outputIntervalHistogram((EncodableHistogram)reportHistogram);
            reportHistogram.reset();
            oldTime = now;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void runProducer(Arguments arguments, long numMessages, int msgRate, List<byte[]> payloadByteList, byte[] payloadBytes, Random random, CountDownLatch doneLatch) {
        PulsarClient client = null;
        try {
            try {
                String prefixTopicName = arguments.topics.get(0);
                ArrayList futures = Lists.newArrayList();
                ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(arguments.serviceURL).connectionsPerBroker(arguments.maxConnections).ioThreads(arguments.ioThreads).statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS).tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
                if (StringUtils.isNotBlank((CharSequence)arguments.authPluginClassName)) {
                    clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);
                }
                if (arguments.tlsAllowInsecureConnection != null) {
                    clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection.booleanValue());
                }
                if (StringUtils.isNotBlank((CharSequence)arguments.listenerName)) {
                    clientBuilder.listenerName(arguments.listenerName);
                }
                client = clientBuilder.build();
                ProducerBuilder producerBuilder = client.newProducer().sendTimeout(0, TimeUnit.SECONDS).compressionType(arguments.compression).maxPendingMessages(arguments.maxOutstanding).maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions).messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
                if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages == 0) {
                    producerBuilder.enableBatching(false);
                } else {
                    long batchTimeUsec = (long)(arguments.batchTimeMillis * 1000.0);
                    producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true);
                }
                if (arguments.batchMaxMessages > 0) {
                    producerBuilder.batchingMaxMessages(arguments.batchMaxMessages);
                }
                if (arguments.batchMaxBytes > 0) {
                    producerBuilder.batchingMaxBytes(arguments.batchMaxBytes);
                }
                producerBuilder.blockIfQueueFull(true);
                if (StringUtils.isNotBlank((CharSequence)arguments.encKeyName) && StringUtils.isNotBlank((CharSequence)arguments.encKeyFile)) {
                    producerBuilder.addEncryptionKey(arguments.encKeyName);
                    producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile);
                }
                for (int i = 0; i < arguments.numTopics; ++i) {
                    String topic = arguments.numTopics == 1 ? prefixTopicName : String.format("%s-%d", prefixTopicName, i);
                    log.info("Adding {} publishers on topic {}", (Object)arguments.numProducers, (Object)topic);
                    for (int j = 0; j < arguments.numProducers; ++j) {
                        ProducerBuilder prodBuilder = producerBuilder.clone().topic(topic);
                        if (arguments.chunkingAllowed) {
                            prodBuilder.enableChunking(true);
                            prodBuilder.enableBatching(false);
                        }
                        futures.add(prodBuilder.createAsync());
                    }
                }
                ArrayList producers = Lists.newArrayListWithCapacity((int)futures.size());
                for (Future future : futures) {
                    producers.add(future.get());
                }
                Collections.shuffle(producers);
                log.info("Created {} producers", (Object)producers.size());
                RateLimiter rateLimiter = RateLimiter.create((double)msgRate);
                long startTime = System.nanoTime();
                long warmupEndTime = startTime + (long)(arguments.warmupTimeSeconds * 1.0E9);
                long testEndTime = startTime + (long)((double)arguments.testTime * 1.0E9);
                MessageKeyGenerationMode msgKeyMode = null;
                if (StringUtils.isNotBlank((CharSequence)arguments.messageKeyGenerationMode)) {
                    try {
                        msgKeyMode = MessageKeyGenerationMode.valueOf(arguments.messageKeyGenerationMode);
                    }
                    catch (IllegalArgumentException e) {
                        throw new IllegalArgumentException("messageKeyGenerationMode only support [autoIncrement, random]");
                    }
                }
                long totalSent = 0L;
                block13: while (true) {
                    Iterator iterator = producers.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block13;
                        Producer producer = (Producer)iterator.next();
                        if (arguments.testTime > 0L && System.nanoTime() > testEndTime) {
                            log.info("------------------- DONE -----------------------");
                            PerformanceProducer.printAggregatedStats();
                            doneLatch.countDown();
                            Thread.sleep(5000L);
                            System.exit(0);
                        }
                        if (numMessages > 0L && totalSent++ >= numMessages) {
                            log.info("------------------- DONE -----------------------");
                            PerformanceProducer.printAggregatedStats();
                            doneLatch.countDown();
                            Thread.sleep(5000L);
                            System.exit(0);
                        }
                        rateLimiter.acquire();
                        long sendTime = System.nanoTime();
                        byte[] payloadData = arguments.payloadFilename != null ? payloadByteList.get(random.nextInt(payloadByteList.size())) : payloadBytes;
                        TypedMessageBuilder messageBuilder = producer.newMessage().value((Object)payloadData);
                        if (arguments.delay > 0L) {
                            messageBuilder.deliverAfter(arguments.delay, TimeUnit.SECONDS);
                        }
                        if (msgKeyMode == MessageKeyGenerationMode.random) {
                            messageBuilder.key(String.valueOf(random.nextInt()));
                        } else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
                            messageBuilder.key(String.valueOf(totalSent));
                        }
                        ((CompletableFuture)messageBuilder.sendAsync().thenRun(() -> {
                            messagesSent.increment();
                            bytesSent.add(payloadData.length);
                            totalMessagesSent.increment();
                            totalBytesSent.add(payloadData.length);
                            long now = System.nanoTime();
                            if (now > warmupEndTime) {
                                long latencyMicros = TimeUnit.NANOSECONDS.toMicros(now - sendTime);
                                recorder.recordValue(latencyMicros);
                                cumulativeRecorder.recordValue(latencyMicros);
                            }
                        })).exceptionally(ex -> {
                            if (ex.getCause() instanceof ArrayIndexOutOfBoundsException) {
                                return null;
                            }
                            log.warn("Write error on message", ex);
                            messagesFailed.increment();
                            if (arguments.exitOnFailure) {
                                System.exit(-1);
                            }
                            return null;
                        });
                    }
                    break;
                }
            }
            catch (Throwable t) {
                log.error("Got error", t);
                if (null != client) {
                    try {
                        client.close();
                    }
                    catch (PulsarClientException e) {
                        log.error("Failed to close test client", (Throwable)e);
                    }
                }
            }
        }
        catch (Throwable throwable) {
            if (null != client) {
                try {
                    client.close();
                }
                catch (PulsarClientException e) {
                    log.error("Failed to close test client", (Throwable)e);
                }
            }
            throw throwable;
        }
    }

    private static void printAggregatedThroughput(long start) {
        double elapsed = (double)(System.nanoTime() - start) / 1.0E9;
        double rate = (double)totalMessagesSent.sum() / elapsed;
        double throughput = (double)totalBytesSent.sum() / elapsed / 1024.0 / 1024.0 * 8.0;
        log.info("Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s", new Object[]{totalMessagesSent, totalFormat.format(rate), totalFormat.format(throughput)});
    }

    private static void printAggregatedStats() {
        Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{dec.format(reportHistogram.getMean() / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(50.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(95.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.0) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.9) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.99) / 1000.0), dec.format((double)reportHistogram.getValueAtPercentile(99.999) / 1000.0), dec.format((double)reportHistogram.getMaxValue() / 1000.0)});
    }

    public static enum MessageKeyGenerationMode {
        autoIncrement,
        random;

    }

    static class Arguments {
        @Parameter(names={"-h", "--help"}, description="Help message", help=true)
        boolean help;
        @Parameter(names={"--conf-file"}, description="Configuration file")
        public String confFile;
        @Parameter(description="persistent://prop/ns/my-topic", required=true)
        public List<String> topics;
        @Parameter(names={"-threads", "--num-test-threads"}, description="Number of test threads")
        public int numTestThreads = 1;
        @Parameter(names={"-r", "--rate"}, description="Publish rate msg/s across topics")
        public int msgRate = 100;
        @Parameter(names={"-s", "--size"}, description="Message size (bytes)")
        public int msgSize = 1024;
        @Parameter(names={"-t", "--num-topic"}, description="Number of topics")
        public int numTopics = 1;
        @Parameter(names={"-n", "--num-producers"}, description="Number of producers (per topic)")
        public int numProducers = 1;
        @Parameter(names={"-u", "--service-url"}, description="Pulsar Service URL")
        public String serviceURL;
        @Parameter(names={"--auth_plugin"}, description="Authentication plugin class name")
        public String authPluginClassName;
        @Parameter(names={"--listener-name"}, description="Listener name for the broker.")
        String listenerName = null;
        @Parameter(names={"-ch", "--chunking"}, description="Should split the message and publish in chunks if message size is larger than allowed max size")
        private boolean chunkingAllowed = false;
        @Parameter(names={"--auth-params"}, description="Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
        public String authParams;
        @Parameter(names={"-o", "--max-outstanding"}, description="Max number of outstanding messages")
        public int maxOutstanding = 1000;
        @Parameter(names={"-p", "--max-outstanding-across-partitions"}, description="Max number of outstanding messages across partitions")
        public int maxPendingMessagesAcrossPartitions = 50000;
        @Parameter(names={"-c", "--max-connections"}, description="Max number of TCP connections to a single broker")
        public int maxConnections = 100;
        @Parameter(names={"-m", "--num-messages"}, description="Number of messages to publish in total. If 0, it will keep publishing")
        public long numMessages = 0L;
        @Parameter(names={"-i", "--stats-interval-seconds"}, description="Statistics Interval Seconds. If 0, statistics will be disabled")
        public long statsIntervalSeconds = 0L;
        @Parameter(names={"-z", "--compression"}, description="Compress messages payload")
        public CompressionType compression = CompressionType.NONE;
        @Parameter(names={"-f", "--payload-file"}, description="Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages")
        public String payloadFilename = null;
        @Parameter(names={"-e", "--payload-delimiter"}, description="The delimiter used to split lines when using payload from a file")
        public String payloadDelimiter = "\\n";
        @Parameter(names={"-b", "--batch-time-window"}, description="Batch messages in 'x' ms window (Default: 1ms)")
        public double batchTimeMillis = 1.0;
        @Parameter(names={"-bm", "--batch-max-messages"}, description="Maximum number of messages per batch")
        public int batchMaxMessages = 1000;
        @Parameter(names={"-bb", "--batch-max-bytes"}, description="Maximum number of bytes per batch")
        public int batchMaxBytes = 0x400000;
        @Parameter(names={"-time", "--test-duration"}, description="Test duration in secs. If 0, it will keep publishing")
        public long testTime = 0L;
        @Parameter(names={"--warmup-time"}, description="Warm-up time in seconds (Default: 1 sec)")
        public double warmupTimeSeconds = 1.0;
        @Parameter(names={"--trust-cert-file"}, description="Path for the trusted TLS certificate file")
        public String tlsTrustCertsFilePath = "";
        @Parameter(names={"--tls-allow-insecure"}, description="Allow insecure TLS connection")
        public Boolean tlsAllowInsecureConnection = null;
        @Parameter(names={"-k", "--encryption-key-name"}, description="The public key name to encrypt payload")
        public String encKeyName = null;
        @Parameter(names={"-v", "--encryption-key-value-file"}, description="The file which contains the public key to encrypt payload")
        public String encKeyFile = null;
        @Parameter(names={"-d", "--delay"}, description="Mark messages with a given delay in seconds")
        public long delay = 0L;
        @Parameter(names={"-ef", "--exit-on-failure"}, description="Exit from the process on publish failure (default: disable)")
        public boolean exitOnFailure = false;
        @Parameter(names={"-mk", "--message-key-generation-mode"}, description="The generation mode of message key, valid options are: [autoIncrement, random]")
        public String messageKeyGenerationMode = null;
        @Parameter(names={"-ioThreads", "--num-io-threads"}, description="Set the number of threads to be used for handling connections to brokers, default is 1 thread")
        public int ioThreads = 1;

        Arguments() {
        }
    }
}

