/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.common.runtime;

import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.IntStream;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl;
import org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class CoordinatorRuntimeMetricsImplTest {
    private static final String METRICS_GROUP = "test-runtime-metrics";

    @Test
    public void testMetricNames() {
        Metrics metrics = new Metrics();
        HashSet<MetricName> expectedMetrics = new HashSet<MetricName>(Arrays.asList(CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "num-partitions", "state", "loading"), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "num-partitions", "state", "active"), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "num-partitions", "state", "failed"), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-size", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "partition-load-time-max", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "partition-load-time-avg", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "thread-idle-ratio-avg", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-time-ms-max", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-time-ms-p50", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-time-ms-p95", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-time-ms-p99", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-time-ms-p999", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-processing-time-ms-max", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-processing-time-ms-p50", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-processing-time-ms-p95", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-processing-time-ms-p99", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-processing-time-ms-p999", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-purgatory-time-ms-max", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-purgatory-time-ms-p50", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-purgatory-time-ms-p95", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-purgatory-time-ms-p99", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-purgatory-time-ms-p999", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "batch-flush-time-ms-max", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "batch-flush-time-ms-p50", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "batch-flush-time-ms-p95", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "batch-flush-time-ms-p99", new String[0]), CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "batch-flush-time-ms-p999", new String[0])));
        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);){
            runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
            expectedMetrics.forEach(metricName -> Assertions.assertTrue((boolean)metrics.metrics().containsKey(metricName)));
        }
        expectedMetrics.forEach(metricName -> Assertions.assertFalse((boolean)metrics.metrics().containsKey(metricName), (String)("metrics did not expect to contain metricName: " + String.valueOf(metricName) + " after closing.")));
    }

    @Test
    public void testUpdateNumPartitionsMetrics() {
        Metrics metrics = new Metrics();
        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);){
            IntStream.range(0, 10).forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.INITIAL, CoordinatorRuntime.CoordinatorState.LOADING));
            IntStream.range(0, 8).forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.LOADING, CoordinatorRuntime.CoordinatorState.ACTIVE));
            IntStream.range(0, 8).forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.ACTIVE, CoordinatorRuntime.CoordinatorState.FAILED));
            IntStream.range(0, 2).forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorRuntime.CoordinatorState.FAILED, CoordinatorRuntime.CoordinatorState.CLOSED));
            CoordinatorRuntimeMetricsImplTest.assertMetricGauge(metrics, CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "num-partitions", "state", "loading"), 2L);
            CoordinatorRuntimeMetricsImplTest.assertMetricGauge(metrics, CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "num-partitions", "state", "active"), 0L);
            CoordinatorRuntimeMetricsImplTest.assertMetricGauge(metrics, CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "num-partitions", "state", "failed"), 6L);
        }
    }

    @Test
    public void testPartitionLoadSensorMetrics() {
        MockTime time = new MockTime();
        Metrics metrics = new Metrics((Time)time);
        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);){
            long startTimeMs = time.milliseconds();
            runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000L);
            runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 2000L);
            MetricName metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "partition-load-time-avg", new String[0]);
            KafkaMetric metric = (KafkaMetric)metrics.metrics().get(metricName);
            Assertions.assertEquals((Object)1500.0, (Object)metric.metricValue());
            metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "partition-load-time-max", new String[0]);
            metric = (KafkaMetric)metrics.metrics().get(metricName);
            Assertions.assertEquals((Object)2000.0, (Object)metric.metricValue());
        }
    }

    @Test
    public void testThreadIdleSensor() {
        MockTime time = new MockTime();
        Metrics metrics = new Metrics((Time)time);
        CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
        IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleTime((double)(i + 1) * 1000.0));
        MetricName metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "thread-idle-ratio-avg", new String[0]);
        KafkaMetric metric = (KafkaMetric)metrics.metrics().get(metricName);
        Assertions.assertEquals((Object)0.2, (Object)metric.metricValue());
    }

    @Test
    public void testEventQueueSize() {
        MockTime time = new MockTime();
        Metrics metrics = new Metrics((Time)time);
        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);){
            runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
            CoordinatorRuntimeMetricsImplTest.assertMetricGauge(metrics, CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-queue-size", new String[0]), 5L);
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"event-queue-time-ms", "event-processing-time-ms", "event-purgatory-time-ms", "batch-flush-time-ms"})
    public void testHistogramMetrics(String metricNamePrefix) {
        MockTime time = new MockTime();
        Metrics metrics = new Metrics((Time)time);
        CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
        IntStream.range(1, 1001).forEach(i -> {
            switch (metricNamePrefix) {
                case "event-queue-time-ms": {
                    runtimeMetrics.recordEventQueueTime((long)i);
                    break;
                }
                case "event-processing-time-ms": {
                    runtimeMetrics.recordEventProcessingTime((long)i);
                    break;
                }
                case "event-purgatory-time-ms": {
                    runtimeMetrics.recordEventPurgatoryTime((long)i);
                    break;
                }
                case "batch-flush-time-ms": {
                    runtimeMetrics.recordFlushTime((long)i);
                }
            }
        });
        MetricName metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, metricNamePrefix + "-max", new String[0]);
        KafkaMetric metric = (KafkaMetric)metrics.metrics().get(metricName);
        Assertions.assertEquals((Object)1000.0, (Object)metric.metricValue());
        metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, metricNamePrefix + "-p50", new String[0]);
        metric = (KafkaMetric)metrics.metrics().get(metricName);
        Assertions.assertEquals((Object)500.0, (Object)metric.metricValue());
        metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, metricNamePrefix + "-p95", new String[0]);
        metric = (KafkaMetric)metrics.metrics().get(metricName);
        Assertions.assertEquals((Object)950.0, (Object)metric.metricValue());
        metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, metricNamePrefix + "-p99", new String[0]);
        metric = (KafkaMetric)metrics.metrics().get(metricName);
        Assertions.assertEquals((Object)990.0, (Object)metric.metricValue());
        metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, metricNamePrefix + "-p999", new String[0]);
        metric = (KafkaMetric)metrics.metrics().get(metricName);
        Assertions.assertEquals((Object)999.0, (Object)metric.metricValue());
    }

    @Test
    public void testRecordEventPurgatoryTimeLimit() {
        MockTime time = new MockTime();
        Metrics metrics = new Metrics((Time)time);
        CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
        IntStream.range(1, 1001).forEach(__ -> runtimeMetrics.recordEventPurgatoryTime(KafkaMetricHistogram.MAX_LATENCY_MS + 1000L));
        MetricName metricName = CoordinatorRuntimeMetricsImplTest.kafkaMetricName(metrics, "event-purgatory-time-ms-max", new String[0]);
        KafkaMetric metric = (KafkaMetric)metrics.metrics().get(metricName);
        long value = ((Double)metric.metricValue()).longValue();
        Assertions.assertTrue((value >= KafkaMetricHistogram.MAX_LATENCY_MS && value < KafkaMetricHistogram.MAX_LATENCY_MS + 1000L ? 1 : 0) != 0);
    }

    private static void assertMetricGauge(Metrics metrics, MetricName metricName, long count) {
        Assertions.assertEquals((long)count, (long)((Long)metrics.metric(metricName).metricValue()));
    }

    private static MetricName kafkaMetricName(Metrics metrics, String name, String ... keyValue) {
        return metrics.metricName(name, METRICS_GROUP, "", keyValue);
    }
}

