/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.compatibility;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.DoubleSummaryStatistics;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.tier.compatibility.Logger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;

public class JmxMetricsAnalyzer {
    private final String topic;
    private final AdminClient adminClient;
    private final int numPartitions;
    private final Logger logger;
    private final Map<Integer, String> metricsFilenames;
    private Set<String> attributes = new HashSet<String>();
    private Map<Integer, Map<String, List<Double>>> metricsCsv = new HashMap<Integer, Map<String, List<Double>>>();
    private Map<Integer, Map<Integer, Map<String, Double>>> jmxStats = new HashMap<Integer, Map<Integer, Map<String, Double>>>();
    private Map<String, Double> averageJmxValue = new HashMap<String, Double>();
    private Map<String, Double> maximumJmxValue = new HashMap<String, Double>();
    private Map<String, Double> trueAverageJmxValue = new HashMap<String, Double>();
    private Map<String, Double> trueMaximumJmxValue = new HashMap<String, Double>();

    public JmxMetricsAnalyzer(Map<Integer, String> metricsFilenames, AdminClient adminClient, String topic, int numPartitions, boolean debug) throws IOException {
        this.metricsFilenames = metricsFilenames;
        this.adminClient = adminClient;
        this.topic = topic;
        this.numPartitions = numPartitions;
        this.logger = new Logger(debug);
        for (Map.Entry<Integer, String> entry : metricsFilenames.entrySet()) {
            int nodeIdx = entry.getKey();
            String fileName = entry.getValue();
            Map<String, List<Double>> contents = this.readCsvAndPopulateAttributes(fileName);
            if (contents.isEmpty()) {
                throw new IllegalArgumentException(String.format("Empty CSV file %s provided by user.", fileName));
            }
            this.metricsCsv.put(nodeIdx, contents);
        }
        this.populateJmxOutputAllNodes();
    }

    public Map<Integer, Long> checkTierSize() {
        HashMap<Integer, Long> partitionToTierSize = new HashMap<Integer, Long>();
        for (int partition = 0; partition < this.numPartitions; ++partition) {
            String tierSizeMetric = String.format("kafka.log:type=Log,name=TierSize,topic=%s,partition=%d:Value", this.topic, partition);
            long tierSize = this.maximumJmxValue.getOrDefault(tierSizeMetric, -1.0).longValue();
            if (tierSize < 0L) {
                this.logger.debug("Tier size metric is absent for partition " + partition);
                return new HashMap<Integer, Long>();
            }
            partitionToTierSize.put(partition, tierSize);
        }
        return partitionToTierSize;
    }

    public boolean checkZeroErrorPartitions() {
        for (Integer nodeIdx : this.metricsFilenames.keySet()) {
            Map<String, Double> lastJmxEntry = this.lastJmxItem(nodeIdx);
            long partitionsInError = lastJmxEntry.getOrDefault("kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError:Value", -1.0).longValue();
            if (partitionsInError == 0L) continue;
            this.logger.debug(String.format("Archiver at node %d has %d partitions in error, but expected 0.", partitionsInError, nodeIdx));
            return false;
        }
        return this.checkZeroFencedPartitions();
    }

    private boolean checkZeroFencedPartitions() {
        for (Integer nodeIdx : this.metricsFilenames.keySet()) {
            Map<String, Double> lastJmxEntry = this.lastJmxItem(nodeIdx);
            long numFencedPartitions = lastJmxEntry.getOrDefault("kafka.server:type=TierTopicConsumer:ErrorPartitions", -1.0).longValue();
            if (numFencedPartitions == 0L) continue;
            this.logger.debug(String.format("Found %d fenced partitions in node %d, but expected 0.", numFencedPartitions, nodeIdx));
            return false;
        }
        return true;
    }

    public boolean checkBytesFetchedFromLocalLog(int thresholdBytes) {
        long totalThresholdBytes;
        long tierBytesFetched = this.maximumJmxValue.getOrDefault("kafka.server:type=TierFetcher:BytesFetchedTotal", -1.0).longValue();
        if (tierBytesFetched < 0L) {
            throw new IllegalStateException(String.format("Tier Bytes Fetched: %d is negative.", tierBytesFetched));
        }
        long totalLogSize = 0L;
        for (int partition = 0; partition < this.numPartitions; ++partition) {
            String logSizeMetric = String.format("kafka.log:type=Log,name=TotalSize,topic=%s,partition=%d:Value", this.topic, partition);
            long logSize = this.trueMaximumJmxValue.getOrDefault(logSizeMetric, -1.0).longValue();
            if (logSize < 0L) {
                throw new IllegalStateException(String.format("Log size metric cannot be absent for partition: %d", partition));
            }
            totalLogSize += logSize;
            this.logger.debug(String.format("Partition: %d, Max Log Size: %d", partition, logSize));
        }
        long totalBytesFetchedFromLocalLog = Math.max(totalLogSize - tierBytesFetched, 0L);
        boolean thresholdCheckPassed = totalBytesFetchedFromLocalLog <= (totalThresholdBytes = (long)thresholdBytes * (long)this.numPartitions);
        this.logger.debug(String.format("Total bytes fetched from local log %d is %s threshold bytes: %d. Total log size: %d, tier bytes fetched: %d.", totalBytesFetchedFromLocalLog, thresholdCheckPassed ? "within" : "larger than", totalThresholdBytes, totalLogSize, tierBytesFetched));
        return thresholdCheckPassed;
    }

    public boolean tieringCompleted() throws ExecutionException, InterruptedException {
        if (!this.tieringStarted()) {
            return false;
        }
        Map<Integer, Integer> leaders = this.getLeaders();
        for (int partition = 0; partition < this.numPartitions; ++partition) {
            String numLogSegmentsMetric = String.format("kafka.log:type=Log,name=NumLogSegments,topic=%s,partition=%d:Value", this.topic, partition);
            Integer leaderIdx = leaders.get(partition);
            if (leaderIdx == null) {
                throw new IllegalStateException(String.format("Leader is absent for topic %s and partition %d", this.topic, partition));
            }
            int leaderNumLogSegments = this.lastJmxItem(leaderIdx).getOrDefault(numLogSegmentsMetric, -1.0).intValue();
            if (leaderNumLogSegments != 1) {
                this.logger.debug(String.format("Archiving not complete as leader %d has %d local log segments for partition %d", leaderIdx, leaderNumLogSegments, partition));
                return false;
            }
            for (Integer nodeIdx : this.metricsFilenames.keySet()) {
                Map<String, Double> lastJmxEntry = this.lastJmxItem(nodeIdx);
                long archiverLag = lastJmxEntry.getOrDefault("kafka.tier.tasks.archive:type=TierArchiver,name=TotalLag:Value", -1.0).longValue();
                int numLogSegments = lastJmxEntry.getOrDefault(numLogSegmentsMetric, -1.0).intValue();
                if (archiverLag == 0L && numLogSegments <= 2) continue;
                this.logger.debug(String.format("Archiving not complete for partition %d. lag: %d numLogSegments: %d", partition, archiverLag, numLogSegments));
                return false;
            }
        }
        return true;
    }

    public boolean tieringStarted() {
        for (int partition = 0; partition < this.numPartitions; ++partition) {
            Object[] objectArray = new Object[]{this.topic, partition};
            String metric = String.format("kafka.log:type=Log,name=TierSize,topic=%s,partition=%d:Value", objectArray);
            Double tierSize = this.maximumJmxValue.getOrDefault(metric, -1.0);
            if (!(tierSize <= 0.0)) continue;
            this.logger.debug(String.format("Tiering has not started since partition %d has TierSize: %f", partition, tierSize));
            return false;
        }
        return true;
    }

    private Map<Integer, Integer> getLeaders() throws ExecutionException, InterruptedException {
        DescribeTopicsResult res = this.adminClient.describeTopics(Collections.singletonList(this.topic));
        KafkaFuture topicDescription = (KafkaFuture)res.values().get(this.topic);
        List topicPartitionInfos = ((TopicDescription)topicDescription.get()).partitions();
        return topicPartitionInfos.stream().collect(Collectors.toMap(topicPartitionInfo -> topicPartitionInfo.partition(), topicPartitionInfo -> {
            if (topicPartitionInfo.leader() == null) {
                throw new IllegalStateException(String.format("No leader for topic %s at partition %d", this.topic, topicPartitionInfo.partition()));
            }
            return topicPartitionInfo.leader().id();
        }));
    }

    private void populateJmxOutputAllNodes() {
        for (Integer nodeIdx : this.metricsFilenames.keySet()) {
            this.populatePerNodeJmxOutput(nodeIdx);
        }
        this.populateAggregates();
    }

    private void populatePerNodeJmxOutput(int nodeIdx) {
        Map<String, List<Double>> nodeMetricsCsv = this.metricsCsv.get(nodeIdx);
        List<Double> times = nodeMetricsCsv.get("time");
        for (int timeIdx = 0; timeIdx < times.size(); ++timeIdx) {
            int timeSec = (int)(times.get(timeIdx) / 1000.0);
            HashMap<String, Double> attributesAndValues = new HashMap<String, Double>();
            for (String attribute : nodeMetricsCsv.keySet()) {
                if (attribute.equals("time")) continue;
                attributesAndValues.put(attribute, nodeMetricsCsv.get(attribute).get(timeIdx));
            }
            this.jmxStats.put(nodeIdx, Collections.singletonMap(timeSec, attributesAndValues));
        }
    }

    private void populateAggregates() {
        List allTimes = this.metricsCsv.keySet().stream().map(k -> this.metricsCsv.get(k).get("time")).flatMap(Collection::stream).collect(Collectors.toList());
        int startTimeSec = (int)((Double)Collections.min(allTimes) / 1000.0);
        int endTimeSec = (int)((Double)Collections.max(allTimes) / 1000.0);
        for (String attribute : this.attributes) {
            ArrayList<Double> aggregatesPerTime = new ArrayList<Double>();
            ArrayList<Double> maxPerTime = new ArrayList<Double>();
            for (int timeSec = startTimeSec; timeSec <= endTimeSec; ++timeSec) {
                ArrayList<Double> valuesPerNode = new ArrayList<Double>();
                for (Integer nodeIdx : this.metricsFilenames.keySet()) {
                    Double value;
                    if (this.jmxStats.get(nodeIdx).get(timeSec) == null || (value = this.jmxStats.get(nodeIdx).get(timeSec).get(attribute)) == null) continue;
                    valuesPerNode.add(value);
                }
                DoubleSummaryStatistics stats = valuesPerNode.stream().collect(Collectors.summarizingDouble(Double::doubleValue));
                aggregatesPerTime.add(stats.getSum());
                maxPerTime.add(stats.getMax());
            }
            DoubleSummaryStatistics aggregateStats = aggregatesPerTime.stream().collect(Collectors.summarizingDouble(Double::doubleValue));
            this.averageJmxValue.put(attribute, aggregateStats.getAverage());
            this.maximumJmxValue.put(attribute, aggregateStats.getMax());
            DoubleSummaryStatistics maxStats = maxPerTime.stream().collect(Collectors.summarizingDouble(Double::doubleValue));
            this.trueMaximumJmxValue.put(attribute, maxStats.getMax());
        }
    }

    private Map<String, Double> lastJmxItem(int idx) {
        return this.jmxStats.get(idx).get(Collections.max(this.jmxStats.get(idx).keySet()));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Map<String, List<Double>> readCsvAndPopulateAttributes(String filename) throws IOException {
        try (BufferedReader csvReader = new BufferedReader(new FileReader(filename));){
            String row;
            HashMap<String, List<Double>> csv = new HashMap<String, List<Double>>();
            Map header = null;
            int lineNum = 0;
            while ((row = csvReader.readLine()) != null) {
                List<String> line = new ArrayList<String>(Arrays.asList(row.split(",")));
                ++lineNum;
                if (line.isEmpty()) {
                    this.logger.warn(String.format("JMX metrics output is empty at line # %d in file: %s", lineNum, filename));
                }
                if (header == null) {
                    line = new ArrayList<String>(Arrays.asList(row.split("\",\""))).stream().map(word -> word.replaceAll("\"", "")).collect(Collectors.toList());
                    this.attributes.addAll(line);
                    header = IntStream.range(0, line.size()).boxed().collect(Collectors.toMap(Function.identity(), line::get));
                    continue;
                }
                for (int colIdx = 0; colIdx < line.size(); ++colIdx) {
                    String column = (String)header.get(colIdx);
                    csv.computeIfAbsent(column, k -> new ArrayList()).add(Double.parseDouble(line.get(colIdx)));
                }
            }
            HashMap<String, List<Double>> hashMap = csv;
            return hashMap;
        }
        catch (Exception e) {
            this.logger.error("Could not populate attributes from CSV file: " + filename);
            throw e;
        }
    }
}

