/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.other;

import java.awt.image.RenderedImage;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.imageio.ImageIO;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
import org.apache.logging.log4j.core.config.Configurator;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartFrame;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.data.xy.XYDataset;
import org.jfree.data.xy.XYSeries;
import org.jfree.data.xy.XYSeriesCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class ReplicationQuotasTestRig {
    public static final Logger LOGGER = LoggerFactory.getLogger(ReplicationQuotasTestRig.class);
    public static final int K = 1000000;
    private static final String DIR;

    public static void main(String[] args) {
        boolean displayChartsOnScreen = args.length > 0 && Objects.equals(args[0], "show-gui");
        Journal journal = new Journal();
        List<ExperimentDef> experiments = Arrays.asList(new ExperimentDef("Experiment1", 5, 20, 1000000L, 500, 100000), new ExperimentDef("Experiment2", 5, 50, 10000000L, 1000, 100000), new ExperimentDef("Experiment3", 50, 50, 2000000L, 1000, 100000), new ExperimentDef("Experiment4", 25, 100, 4000000L, 1000, 100000), new ExperimentDef("Experiment5", 5, 50, 50000000L, 4000, 100000));
        experiments.forEach(def -> ReplicationQuotasTestRig.run(def, journal, displayChartsOnScreen));
        if (!displayChartsOnScreen) {
            Exit.exit((int)0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) {
        Experiment experiment = new Experiment();
        try {
            experiment.run(config, journal, displayChartsOnScreen);
            journal.footer();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            experiment.tearDown();
        }
    }

    static {
        Configurator.reconfigure();
        new File("Experiments").mkdir();
        DIR = "Experiments/Run" + Long.valueOf(System.currentTimeMillis()).toString().substring(8);
        new File(DIR).mkdir();
    }

    static class Journal {
        File log = new File(DIR, "Log.html");

        public Journal() {
            this.header();
        }

        void appendToJournal(ExperimentDef config) {
            DecimalFormat format = new DecimalFormat("###,###.###");
            String message = "\n\n<h3>" + config.name + "</h3><p>- BrokerCount: " + config.brokers + "<p>- PartitionCount: " + config.partitions + "<p>- Throttle: " + format.format(config.throttle) + " MB/s<p>- MsgCount: " + format.format(config.msgsPerPartition) + " <p>- MsgSize: " + format.format(config.msgSize) + "<p>- TargetBytesPerBrokerMB: " + config.targetBytesPerBrokerMB + "<p>";
            this.append(message);
        }

        void appendChart(String path, boolean first) {
            StringBuilder message = new StringBuilder();
            if (first) {
                message.append("<p><p>");
            }
            message.append("<img src=\"" + path + "\" alt=\"Chart\" style=\"width:600px;height:400px;align=\"middle\"\">");
            if (!first) {
                message.append("<p><p>");
            }
            this.append(message.toString());
        }

        void header() {
            this.append("<html><head><h1>Replication Quotas Test Rig</h1></head><body>");
        }

        void footer() {
            this.append("</body></html>");
        }

        void append(String message) {
            try {
                OutputStream stream = Files.newOutputStream(this.log.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
                PrintWriter writer = new PrintWriter(stream);
                writer.append(message);
                writer.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        String path() {
            return this.log.getAbsolutePath();
        }
    }

    static class ExperimentDef {
        String name;
        int brokers;
        int partitions;
        long throttle;
        int msgsPerPartition;
        int msgSize;
        final long targetBytesPerBrokerMB;

        public ExperimentDef(String name, int brokers, int partitions, long throttle, int msgsPerPartition, int msgSize) {
            this.name = name;
            this.brokers = brokers;
            this.partitions = partitions;
            this.throttle = throttle;
            this.msgsPerPartition = msgsPerPartition;
            this.msgSize = msgSize;
            this.targetBytesPerBrokerMB = (long)msgsPerPartition * (long)msgSize * (long)partitions / (long)brokers / 1000000L;
        }
    }

    static class Experiment {
        static final String TOPIC_NAME = "my-topic";
        String experimentName = "unset";
        Map<Integer, List<Double>> leaderRates = new HashMap<Integer, List<Double>>();
        Map<Integer, List<Double>> followerRates = new HashMap<Integer, List<Double>>();
        KafkaClusterTestKit cluster;
        Admin adminClient;

        Experiment() {
        }

        void startBrokers(int numBrokerNodes) {
            System.out.println("Starting Brokers");
            try {
                this.cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumControllerNodes(1).setNumBrokerNodes(numBrokerNodes).build()).build();
                this.cluster.format();
                this.cluster.startup();
                this.cluster.waitForReadyBrokers();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to start test Kafka cluster", e);
            }
            this.adminClient = Admin.create((Properties)this.cluster.clientProperties());
        }

        public void tearDown() {
            Utils.closeQuietly((AutoCloseable)this.adminClient, (String)"adminClient");
            try {
                this.cluster.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void run(final ExperimentDef config, Journal journal, boolean displayChartsOnScreen) throws Exception {
            this.experimentName = config.name;
            final int shift = Math.round((float)config.brokers / 2.0f);
            IntSupplier nextReplicaRoundRobin = new IntSupplier(){
                int count = 0;

                @Override
                public int getAsInt() {
                    ++this.count;
                    return (this.count + shift) % config.brokers;
                }
            };
            Map<Integer, List<Integer>> replicas = IntStream.rangeClosed(0, config.partitions - 1).boxed().collect(Collectors.toMap(Function.identity(), partition -> Collections.singletonList(nextReplicaRoundRobin.getAsInt())));
            this.startBrokers(config.brokers);
            this.adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC_NAME, replicas))).all().get();
            TestUtils.waitUntilTrue(() -> this.cluster.brokers().values().stream().allMatch(server -> {
                TopicImage image = server.metadataCache().currentImage().topics().getTopic(TOPIC_NAME);
                return image != null && image.partitions().values().stream().allMatch(PartitionRegistration::hasLeader);
            }), () -> "Timed out waiting for topic listing", (long)15000L, (long)500L);
            System.out.println("Writing Data");
            try (KafkaProducer<byte[], byte[]> producer = this.createProducer();){
                for (int x = 0; x < config.msgsPerPartition; ++x) {
                    for (int partition2 = 0; partition2 < config.partitions; ++partition2) {
                        producer.send(new ProducerRecord(TOPIC_NAME, Integer.valueOf(partition2), null, (Object)new byte[config.msgSize]));
                    }
                }
            }
            System.out.println("Generating Reassignment");
            Map newAssignment = (Map)ReassignPartitionsCommand.generateAssignment((Admin)this.adminClient, (String)this.json(TOPIC_NAME), (String)this.cluster.brokers().values().stream().map(server -> String.valueOf(server.replicaManager().localBrokerId())).collect(Collectors.joining(",")), (Boolean)true).getKey();
            System.out.println("Starting Reassignment");
            long start = System.currentTimeMillis();
            ReassignPartitionsCommand.executeAssignment((Admin)this.adminClient, (Boolean)false, (String)ReassignPartitionsCommand.formatAsReassignmentJson((Map)newAssignment, Collections.emptyMap()), (Long)config.throttle, (Long)-1L, (Long)10000L, (Time)Time.SYSTEM, (boolean)false);
            this.waitForReassignmentToComplete();
            System.out.println("Reassignment took " + (System.currentTimeMillis() - start) / 1000L + "s");
            this.validateAllOffsetsMatch(config);
            journal.appendToJournal(config);
            this.renderChart(this.leaderRates, "Leader", journal, displayChartsOnScreen);
            this.renderChart(this.followerRates, "Follower", journal, displayChartsOnScreen);
            this.logOutput(config, replicas, newAssignment);
            System.out.println("Output can be found here: " + journal.path());
        }

        void validateAllOffsetsMatch(ExperimentDef config) {
            for (KafkaBroker broker : this.cluster.brokers().values()) {
                for (int partitionId = 0; partitionId < config.partitions; ++partitionId) {
                    long offset = (Long)broker.logManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false).map(UnifiedLog::logEndOffset).getOrElse(() -> -1L);
                    if (offset < 0L || offset == (long)config.msgsPerPartition) continue;
                    throw new RuntimeException("Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().nodeId() + ". Expected " + config.msgsPerPartition + " but was " + offset + ".");
                }
            }
        }

        void logOutput(ExperimentDef config, Map<Integer, List<Integer>> replicas, Map<TopicPartition, List<Integer>> newAssignment) throws Exception {
            List actual = ((TopicDescription)((Map)this.adminClient.describeTopics(Collections.singleton(TOPIC_NAME)).allTopicNames().get()).get(TOPIC_NAME)).partitions();
            Map<Integer, List> curAssignment = actual.stream().collect(Collectors.toMap(TopicPartitionInfo::partition, p -> p.replicas().stream().map(Node::id).collect(Collectors.toList())));
            System.out.println("The replicas are " + new TreeMap<Integer, List<Integer>>(replicas).entrySet().stream().map(e -> "\n" + String.valueOf(e)).collect(Collectors.joining()));
            System.out.println("This is the current replica assignment:\n" + String.valueOf(curAssignment));
            System.out.println("proposed assignment is: \n" + String.valueOf(newAssignment));
            System.out.println("This is the assignment we ended up with " + String.valueOf(curAssignment));
            System.out.println("numBrokers: " + config.brokers);
            System.out.println("numPartitions: " + config.partitions);
            System.out.println("throttle: " + config.throttle);
            System.out.println("numMessagesPerPartition: " + config.msgsPerPartition);
            System.out.println("msgSize: " + config.msgSize);
            System.out.println("We will write " + config.targetBytesPerBrokerMB + "MB of data per broker");
            System.out.println("Worst case duration is " + config.targetBytesPerBrokerMB * 1000L * 1000L / config.throttle);
        }

        void waitForReassignmentToComplete() {
            TestUtils.waitUntilTrue(() -> {
                this.printRateMetrics();
                try {
                    return ((Map)this.adminClient.listPartitionReassignments().reassignments().get()).isEmpty();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }, () -> "Partition reassignments didn't complete.", (long)3600000L, (long)1000L);
        }

        void renderChart(Map<Integer, List<Double>> data, String name, Journal journal, boolean displayChartsOnScreen) throws Exception {
            XYSeriesCollection dataset = this.addDataToChart(data);
            JFreeChart chart = this.createChart(name, dataset);
            this.writeToFile(name, journal, chart);
            this.maybeDisplayOnScreen(displayChartsOnScreen, chart);
            System.out.println("Chart generated for " + name);
        }

        void maybeDisplayOnScreen(boolean displayChartsOnScreen, JFreeChart chart) {
            if (displayChartsOnScreen) {
                ChartFrame frame = new ChartFrame(this.experimentName, chart);
                frame.pack();
                frame.setVisible(true);
            }
        }

        void writeToFile(String name, Journal journal, JFreeChart chart) throws Exception {
            File file = new File(DIR, this.experimentName + "-" + name + ".png");
            ImageIO.write((RenderedImage)chart.createBufferedImage(1000, 700), "png", file);
            journal.appendChart(file.getAbsolutePath(), name.equals("Leader"));
        }

        JFreeChart createChart(String name, XYSeriesCollection dataset) {
            return ChartFactory.createXYLineChart((String)(this.experimentName + " - " + name + " Throttling Performance"), (String)"Time (s)", (String)"Throttle Throughput (B/s)", (XYDataset)dataset, (PlotOrientation)PlotOrientation.VERTICAL, (boolean)false, (boolean)true, (boolean)false);
        }

        XYSeriesCollection addDataToChart(Map<Integer, List<Double>> data) {
            XYSeriesCollection dataset = new XYSeriesCollection();
            data.forEach((broker, values) -> {
                XYSeries series = new XYSeries((Comparable)((Object)("Broker:" + broker)));
                int x = 0;
                Iterator iterator = values.iterator();
                while (iterator.hasNext()) {
                    double value = (Double)iterator.next();
                    series.add((double)x, value);
                    ++x;
                }
                dataset.addSeries(series);
            });
            return dataset;
        }

        void record(Map<Integer, List<Double>> rates, int nodeId, Double currentRate) {
            List leaderRatesBroker = rates.getOrDefault(nodeId, new ArrayList());
            leaderRatesBroker.add(currentRate);
            rates.put(nodeId, leaderRatesBroker);
        }

        void printRateMetrics() {
            for (BrokerServer broker : this.cluster.brokers().values()) {
                double leaderRate = this.measuredRate((KafkaBroker)broker, QuotaType.LEADER_REPLICATION);
                if (broker.config().nodeId() == 0) {
                    LOGGER.info("waiting... Leader rate on 1 is {}", (Object)leaderRate);
                }
                this.record(this.leaderRates, broker.config().nodeId(), leaderRate);
                if (leaderRate > 0.0) {
                    LOGGER.trace("Leader Rate on {} is {}", (Object)broker.config().nodeId(), (Object)leaderRate);
                }
                double followerRate = this.measuredRate((KafkaBroker)broker, QuotaType.FOLLOWER_REPLICATION);
                this.record(this.followerRates, broker.config().nodeId(), followerRate);
                if (!(followerRate > 0.0)) continue;
                LOGGER.trace("Follower Rate on {} is {}", (Object)broker.config().nodeId(), (Object)followerRate);
            }
        }

        private double measuredRate(KafkaBroker broker, QuotaType repType) {
            MetricName metricName = broker.metrics().metricName("byte-rate", repType.toString());
            return broker.metrics().metrics().containsKey(metricName) ? (Double)((KafkaMetric)broker.metrics().metrics().get(metricName)).metricValue() : -1.0;
        }

        String json(String ... topic) {
            String topicStr = Arrays.stream(topic).map(t -> "{\"topic\": \"" + t + "\"}").collect(Collectors.joining(","));
            return "{\"topics\": [" + topicStr + "],\"version\":1}";
        }

        KafkaProducer<byte[], byte[]> createProducer() {
            return TestUtils.createProducer((String)this.cluster.bootstrapServers(), (int)1, (long)60000L, (long)0x100000L, (int)Integer.MAX_VALUE, (int)30000, (int)0, (int)16384, (String)"none", (int)20000, (SecurityProtocol)SecurityProtocol.PLAINTEXT, (Option)Option.empty(), (Option)Option.empty(), (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), (boolean)false);
        }
    }
}

