package org.apache.kafka.streams.processor.internals.metrics;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.easymock.IArgumentMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({Sensor.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.class */
public class StreamsMetricsImplTest {
    private static final String SENSOR_PREFIX_DELIMITER = ".";
    private static final String SENSOR_NAME_DELIMITER = ".s.";
    private static final String INTERNAL_PREFIX = "internal";
    private static final String VERSION = "latest";
    private static final String CLIENT_ID = "test-client";
    private static final String THREAD_ID = "test-thread";
    private static final String TASK_ID = "test-task";
    private static final String METRIC_NAME1 = "test-metric1";
    private static final String METRIC_NAME2 = "test-metric2";
    private static final String THREAD_ID_TAG = "thread-id";
    private static final String THREAD_ID_TAG_0100_TO_24 = "client-id";
    private static final String TASK_ID_TAG = "task-id";
    private static final String STORE_ID_TAG = "state-id";
    private static final String RECORD_CACHE_ID_TAG = "record-cache-id";
    private static final String SCOPE_NAME = "test-scope";
    private static final String ENTITY_NAME = "test-entity";
    private static final String OPERATION_NAME = "test-operation";
    private static final String CUSTOM_TAG_KEY1 = "test-key1";
    private static final String CUSTOM_TAG_VALUE1 = "test-value1";
    private static final String CUSTOM_TAG_KEY2 = "test-key2";
    private static final String CUSTOM_TAG_VALUE2 = "test-value2";
    private final Metrics metrics = new Metrics();
    private final Sensor sensor = this.metrics.sensor("dummy");
    private final String storeName = "store";
    private final String sensorName1 = "sensor1";
    private final String sensorName2 = "sensor2";
    private final String metricNamePrefix = "metric";
    private final String group = "group";
    private final Map<String, String> tags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("tag", "value")});
    private final String description1 = "description number one";
    private final String description2 = "description number two";
    private final String description3 = "description number three";
    private final String description4 = "description number four";
    private final Map<String, String> clientLevelTags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(THREAD_ID_TAG_0100_TO_24, CLIENT_ID)});
    private final MetricName metricName1 = new MetricName(METRIC_NAME1, "stream-metrics", "description number one", this.clientLevelTags);
    private final MetricName metricName2 = new MetricName(METRIC_NAME1, "stream-metrics", "description number two", this.clientLevelTags);
    private final MockTime time = new MockTime(0);
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, VERSION);

    private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) {
        EasyMock.reportMatcher(new IArgumentMatcher() { // from class: org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImplTest.1
            private final StringBuffer message = new StringBuffer();

            public boolean matches(Object obj) {
                if (obj instanceof MetricConfig) {
                    MetricConfig metricConfig2 = (MetricConfig) obj;
                    boolean z = (metricConfig2.quota() == metricConfig.quota() || metricConfig2.quota().equals(metricConfig.quota())) && metricConfig2.tags().equals(metricConfig.tags());
                    if (metricConfig2.eventWindow() == metricConfig.eventWindow() && metricConfig2.recordLevel() == metricConfig.recordLevel() && z && metricConfig2.samples() == metricConfig.samples() && metricConfig2.timeWindowMs() == metricConfig.timeWindowMs()) {
                        return true;
                    }
                    this.message.append("{ ");
                    this.message.append("eventWindow=");
                    this.message.append(metricConfig2.eventWindow());
                    this.message.append(", ");
                    this.message.append("recordLevel=");
                    this.message.append(metricConfig2.recordLevel());
                    this.message.append(", ");
                    this.message.append("quota=");
                    this.message.append(metricConfig2.quota().toString());
                    this.message.append(", ");
                    this.message.append("samples=");
                    this.message.append(metricConfig2.samples());
                    this.message.append(", ");
                    this.message.append("tags=");
                    this.message.append(metricConfig2.tags().toString());
                    this.message.append(", ");
                    this.message.append("timeWindowMs=");
                    this.message.append(metricConfig2.timeWindowMs());
                    this.message.append(" }");
                }
                this.message.append("not a MetricConfig object");
                return false;
            }

            public void appendTo(StringBuffer stringBuffer) {
                stringBuffer.append(this.message);
            }
        });
        return null;
    }

    private void addSensorsOnAllLevels(Metrics metrics, StreamsMetricsImpl streamsMetricsImpl) {
        EasyMock.expect(metrics.sensor(EasyMock.anyString(), (Sensor.RecordingLevel) EasyMock.anyObject(Sensor.RecordingLevel.class), (Sensor[]) EasyMock.anyObject(Sensor[].class))).andStubReturn(this.sensor);
        EasyMock.expect(metrics.metricName(METRIC_NAME1, "stream-metrics", "description number one", this.clientLevelTags)).andReturn(this.metricName1);
        EasyMock.expect(metrics.metricName(METRIC_NAME2, "stream-metrics", "description number two", this.clientLevelTags)).andReturn(this.metricName2);
        EasyMock.replay(new Object[]{metrics});
        streamsMetricsImpl.addClientLevelImmutableMetric(METRIC_NAME1, "description number one", Sensor.RecordingLevel.INFO, "value");
        streamsMetricsImpl.addClientLevelImmutableMetric(METRIC_NAME2, "description number two", Sensor.RecordingLevel.INFO, "value");
        streamsMetricsImpl.threadLevelSensor(THREAD_ID, "sensor1", Sensor.RecordingLevel.INFO, new Sensor[0]);
        streamsMetricsImpl.threadLevelSensor(THREAD_ID, "sensor2", Sensor.RecordingLevel.INFO, new Sensor[0]);
        streamsMetricsImpl.taskLevelSensor(THREAD_ID, TASK_ID, "sensor1", Sensor.RecordingLevel.INFO, new Sensor[0]);
        streamsMetricsImpl.taskLevelSensor(THREAD_ID, TASK_ID, "sensor2", Sensor.RecordingLevel.INFO, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(THREAD_ID, TASK_ID, "store", "sensor1", Sensor.RecordingLevel.INFO, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(THREAD_ID, TASK_ID, "store", "sensor2", Sensor.RecordingLevel.INFO, new Sensor[0]);
    }

    private void setupGetNewSensorTest(Metrics metrics, String str, Sensor.RecordingLevel recordingLevel) {
        String fullSensorName = fullSensorName(str);
        EasyMock.expect(metrics.getSensor(fullSensorName)).andStubReturn((Object) null);
        EasyMock.expect(metrics.sensor(fullSensorName, recordingLevel, new Sensor[0])).andReturn(this.sensor);
        EasyMock.replay(new Object[]{metrics});
    }

    private void setupGetExistingSensorTest(Metrics metrics, String str) {
        EasyMock.expect(metrics.getSensor(fullSensorName(str))).andStubReturn(this.sensor);
        EasyMock.replay(new Object[]{metrics});
    }

    private String fullSensorName(String str) {
        return "internal." + str + SENSOR_NAME_DELIMITER + "sensor1";
    }

    @Test
    public void shouldGetNewThreadLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, THREAD_ID, recordingLevel);
        Sensor threadLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).threadLevelSensor(THREAD_ID, "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(threadLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingThreadLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics, THREAD_ID);
        Sensor threadLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).threadLevelSensor(THREAD_ID, "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(threadLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewTaskLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, "test-thread.task.test-task", recordingLevel);
        Sensor taskLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).taskLevelSensor(THREAD_ID, TASK_ID, "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(taskLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingTaskLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics, "test-thread.task.test-task");
        Sensor taskLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).taskLevelSensor(THREAD_ID, TASK_ID, "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(taskLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewStoreLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, "test-thread.task.store.store.test-task", recordingLevel);
        Sensor storeLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).storeLevelSensor(THREAD_ID, "store", TASK_ID, "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(storeLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingStoreLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics, "test-thread.task.store.store.test-task");
        Sensor storeLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).storeLevelSensor(THREAD_ID, "store", TASK_ID, "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(storeLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewNodeLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, "test-thread.task.test-task.node.processorNodeName", recordingLevel);
        Sensor nodeLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).nodeLevelSensor(THREAD_ID, TASK_ID, "processorNodeName", "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(nodeLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingNodeLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics, "test-thread.task.test-task.node.processorNodeName");
        Sensor nodeLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).nodeLevelSensor(THREAD_ID, TASK_ID, "processorNodeName", "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(nodeLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewCacheLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, "test-thread.task.test-task.cache.processorNodeName", recordingLevel);
        Sensor cacheLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).cacheLevelSensor(THREAD_ID, TASK_ID, "processorNodeName", "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(cacheLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingCacheLevelSensor() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics, "test-thread.task.test-task.cache.processorNodeName");
        Sensor cacheLevelSensor = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).cacheLevelSensor(THREAD_ID, TASK_ID, "processorNodeName", "sensor1", recordingLevel, new Sensor[0]);
        EasyMock.verify(new Object[]{metrics});
        MatcherAssert.assertThat(cacheLevelSensor, CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldAddClientLevelImmutableMetric() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
        StreamsMetricsImpl.ImmutableMetricValue immutableMetricValue = new StreamsMetricsImpl.ImmutableMetricValue("immutable-value");
        EasyMock.expect(metrics.metricName(METRIC_NAME1, "stream-metrics", "description number one", this.clientLevelTags)).andReturn(this.metricName1);
        metrics.addMetric((MetricName) EasyMock.eq(this.metricName1), eqMetricConfig(recordLevel), (MetricValueProvider) EasyMock.eq(immutableMetricValue));
        EasyMock.replay(new Object[]{metrics});
        new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).addClientLevelImmutableMetric(METRIC_NAME1, "description number one", recordingLevel, "immutable-value");
        EasyMock.verify(new Object[]{metrics});
    }

    @Test
    public void shouldAddClientLevelMutableMetric() {
        Metrics metrics = (Metrics) EasyMock.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
        Gauge gauge = (metricConfig, j) -> {
            return "mutable-value";
        };
        EasyMock.expect(metrics.metricName(METRIC_NAME1, "stream-metrics", "description number one", this.clientLevelTags)).andReturn(this.metricName1);
        metrics.addMetric((MetricName) EasyMock.eq(this.metricName1), eqMetricConfig(recordLevel), (MetricValueProvider) EasyMock.eq(gauge));
        EasyMock.replay(new Object[]{metrics});
        new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION).addClientLevelMutableMetric(METRIC_NAME1, "description number one", recordingLevel, gauge);
        EasyMock.verify(new Object[]{metrics});
    }

    @Test
    public void shouldProvideCorrectStrings() {
        MatcherAssert.assertThat("-latency", CoreMatchers.is("-latency"));
        MatcherAssert.assertThat("all", CoreMatchers.is("all"));
    }

    private void setupRemoveSensorsTest(Metrics metrics, String str, Sensor.RecordingLevel recordingLevel) {
        String str2 = "internal." + str + SENSOR_NAME_DELIMITER;
        EasyMock.resetToDefault(new Object[]{metrics});
        metrics.removeSensor(str2 + "sensor1");
        metrics.removeSensor(str2 + "sensor2");
        EasyMock.replay(new Object[]{metrics});
    }

    @Test
    public void shouldRemoveClientLevelMetrics() {
        Metrics metrics = (Metrics) EasyMock.niceMock(Metrics.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
        addSensorsOnAllLevels(metrics, streamsMetricsImpl);
        EasyMock.resetToDefault(new Object[]{metrics});
        EasyMock.expect(metrics.removeMetric(this.metricName1)).andStubReturn((Object) null);
        EasyMock.expect(metrics.removeMetric(this.metricName2)).andStubReturn((Object) null);
        EasyMock.replay(new Object[]{metrics});
        streamsMetricsImpl.removeAllClientLevelMetrics();
        EasyMock.verify(new Object[]{metrics});
    }

    @Test
    public void shouldRemoveThreadLevelSensors() {
        Metrics metrics = (Metrics) EasyMock.niceMock(Metrics.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
        addSensorsOnAllLevels(metrics, streamsMetricsImpl);
        setupRemoveSensorsTest(metrics, THREAD_ID, Sensor.RecordingLevel.INFO);
        streamsMetricsImpl.removeAllThreadLevelSensors(THREAD_ID);
        EasyMock.verify(new Object[]{metrics});
    }

    @Test(expected = NullPointerException.class)
    public void testNullMetrics() {
        new StreamsMetricsImpl((Metrics) null, "", VERSION);
    }

    @Test(expected = NullPointerException.class)
    public void testRemoveNullSensor() {
        this.streamsMetrics.removeSensor((Sensor) null);
    }

    @Test
    public void testRemoveSensor() {
        Sensor addSensor = this.streamsMetrics.addSensor("sensor1", Sensor.RecordingLevel.DEBUG);
        this.streamsMetrics.removeSensor(addSensor);
        this.streamsMetrics.removeSensor(this.streamsMetrics.addSensor("sensor1", Sensor.RecordingLevel.DEBUG, new Sensor[]{addSensor}));
        this.streamsMetrics.removeSensor(this.streamsMetrics.addLatencyRateTotalSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]));
        this.streamsMetrics.removeSensor(this.streamsMetrics.addRateTotalSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]));
        Assert.assertEquals(Collections.emptyMap(), this.streamsMetrics.parentSensors());
    }

    @Test
    public void testMultiLevelSensorRemoval() {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, THREAD_ID, VERSION);
        Iterator it = metrics.metrics().keySet().iterator();
        while (it.hasNext()) {
            metrics.removeMetric((MetricName) it.next());
        }
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("tkey", "value")});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("nkey", "value")});
        Sensor taskLevelSensor = streamsMetricsImpl.taskLevelSensor(THREAD_ID, "taskName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[0]);
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(taskLevelSensor, "stream-processor-node-metrics", mkMap, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(taskLevelSensor, "stream-processor-node-metrics", mkMap, "operation", "", "");
        int size = metrics.metrics().size();
        Sensor nodeLevelSensor = streamsMetricsImpl.nodeLevelSensor(THREAD_ID, "taskName", "processorNodeName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[]{taskLevelSensor});
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(nodeLevelSensor, "stream-processor-node-metrics", mkMap2, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(nodeLevelSensor, "stream-processor-node-metrics", mkMap2, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.greaterThan(Integer.valueOf(size)));
        streamsMetricsImpl.removeAllNodeLevelSensors(THREAD_ID, "taskName", "processorNodeName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size)));
        Sensor taskLevelSensor2 = streamsMetricsImpl.taskLevelSensor(THREAD_ID, "taskName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[0]);
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(taskLevelSensor2, "stream-processor-node-metrics", mkMap, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(taskLevelSensor2, "stream-processor-node-metrics", mkMap, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size)));
        Sensor nodeLevelSensor2 = streamsMetricsImpl.nodeLevelSensor(THREAD_ID, "taskName", "processorNodeName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[]{taskLevelSensor2});
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(nodeLevelSensor2, "stream-processor-node-metrics", mkMap2, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(nodeLevelSensor2, "stream-processor-node-metrics", mkMap2, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.greaterThan(Integer.valueOf(size)));
        streamsMetricsImpl.removeAllNodeLevelSensors(THREAD_ID, "taskName", "processorNodeName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size)));
        streamsMetricsImpl.removeAllTaskLevelSensors(THREAD_ID, "taskName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(0));
    }

    @Test
    public void testLatencyMetrics() {
        int size = this.streamsMetrics.metrics().size();
        Sensor addLatencyAndThroughputSensor = this.streamsMetrics.addLatencyAndThroughputSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]);
        Assert.assertEquals(size + 4 + 4, this.streamsMetrics.metrics().size());
        this.streamsMetrics.removeSensor(addLatencyAndThroughputSensor);
        Assert.assertEquals(size, this.streamsMetrics.metrics().size());
    }

    @Test
    public void testThroughputMetrics() {
        int size = this.streamsMetrics.metrics().size();
        Sensor addThroughputSensor = this.streamsMetrics.addThroughputSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]);
        Assert.assertEquals(size + 4, this.streamsMetrics.metrics().size());
        this.streamsMetrics.removeSensor(addThroughputSensor);
        Assert.assertEquals(size, this.streamsMetrics.metrics().size());
    }

    @Test
    public void testTotalMetricDoesntDecrease() {
        MockTime mockTime = new MockTime(1L);
        MetricConfig timeWindow = new MetricConfig().timeWindow(1L, TimeUnit.MILLISECONDS);
        Metrics metrics = new Metrics(timeWindow, mockTime);
        Sensor addLatencyRateTotalSensor = new StreamsMetricsImpl(metrics, "", VERSION).addLatencyRateTotalSensor("scope", "entity", "op", Sensor.RecordingLevel.INFO, new String[0]);
        KafkaMetric metric = metrics.metric(metrics.metricName("op-total", "stream-scope-metrics", "", new String[]{THREAD_ID_TAG, Thread.currentThread().getName(), "scope-id", "entity"}));
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(i, Math.round(metric.measurable().measure(timeWindow, mockTime.milliseconds())));
            addLatencyRateTotalSensor.record(100.0d, mockTime.milliseconds());
        }
    }

    @Test
    public void shouldAddLatencyRateTotalSensorWithBuiltInMetricsVersionLatest() {
        shouldAddLatencyRateTotalSensor(VERSION);
    }

    @Test
    public void shouldAddLatencyRateTotalSensorWithBuiltInMetricsVersion0100To24() {
        shouldAddLatencyRateTotalSensor("0.10.0-2.4");
    }

    private void shouldAddLatencyRateTotalSensor(String str) {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, str);
        shouldAddCustomSensor(streamsMetricsImpl.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[0]), streamsMetricsImpl, Arrays.asList("test-operation-latency-avg", "test-operation-latency-max", "test-operation-total", "test-operation-rate"));
    }

    @Test
    public void shouldAddRateTotalSensorWithBuiltInMetricsVersionLatest() {
        shouldAddRateTotalSensor(VERSION);
    }

    @Test
    public void shouldAddRateTotalSensorWithBuiltInMetricsVersion0100To24() {
        shouldAddRateTotalSensor("0.10.0-2.4");
    }

    private void shouldAddRateTotalSensor(String str) {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, str);
        shouldAddCustomSensor(streamsMetricsImpl.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[0]), streamsMetricsImpl, Arrays.asList("test-operation-total", "test-operation-rate"));
    }

    @Test
    public void shouldAddLatencyRateTotalSensorWithCustomTags() {
        shouldAddCustomSensorWithTags(this.streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{CUSTOM_TAG_KEY1, CUSTOM_TAG_VALUE1, CUSTOM_TAG_KEY2, CUSTOM_TAG_VALUE2}), Arrays.asList("test-operation-latency-avg", "test-operation-latency-max", "test-operation-total", "test-operation-rate"), customTags(this.streamsMetrics));
    }

    @Test
    public void shouldAddRateTotalSensorWithCustomTags() {
        shouldAddCustomSensorWithTags(this.streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{CUSTOM_TAG_KEY1, CUSTOM_TAG_VALUE1, CUSTOM_TAG_KEY2, CUSTOM_TAG_VALUE2}), Arrays.asList("test-operation-total", "test-operation-rate"), customTags(this.streamsMetrics));
    }

    private void shouldAddCustomSensor(Sensor sensor, StreamsMetricsImpl streamsMetricsImpl, List<String> list) {
        shouldAddCustomSensorWithTags(sensor, list, tags(streamsMetricsImpl));
    }

    private void shouldAddCustomSensorWithTags(Sensor sensor, List<String> list, Map<String, String> map) {
        Assert.assertTrue(sensor.hasMetrics());
        MatcherAssert.assertThat(sensor.name(), CoreMatchers.is("external." + Thread.currentThread().getName() + ".entity." + ENTITY_NAME + SENSOR_NAME_DELIMITER + OPERATION_NAME));
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(StreamsTestUtils.containsMetric(this.metrics, it.next(), "stream-test-scope-metrics", map));
        }
    }

    private Map<String, String> tags(StreamsMetricsImpl streamsMetricsImpl) {
        Map.Entry[] entryArr = new Map.Entry[2];
        entryArr[0] = Utils.mkEntry(streamsMetricsImpl.version() == StreamsMetricsImpl.Version.LATEST ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_24, Thread.currentThread().getName());
        entryArr[1] = Utils.mkEntry("test-scope-id", ENTITY_NAME);
        return Utils.mkMap(entryArr);
    }

    private Map<String, String> customTags(StreamsMetricsImpl streamsMetricsImpl) {
        Map<String, String> tags = tags(streamsMetricsImpl);
        tags.put(CUSTOM_TAG_KEY1, CUSTOM_TAG_VALUE1);
        tags.put(CUSTOM_TAG_KEY2, CUSTOM_TAG_VALUE2);
        return tags;
    }

    @Test
    public void shouldThrowIfLatencyRateTotalSensorIsAddedWithOddTags() {
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{"bad-tag"});
        })).getMessage(), CoreMatchers.is("Tags needs to be specified in key-value pairs"));
    }

    @Test
    public void shouldThrowIfRateTotalSensorIsAddedWithOddTags() {
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{"bad-tag"});
        })).getMessage(), CoreMatchers.is("Tags needs to be specified in key-value pairs"));
    }

    @Test
    public void shouldGetClientLevelTagMap() {
        Map clientLevelTagMap = this.streamsMetrics.clientLevelTagMap();
        MatcherAssert.assertThat(Integer.valueOf(clientLevelTagMap.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(clientLevelTagMap.get(THREAD_ID_TAG_0100_TO_24), Matchers.equalTo(CLIENT_ID));
    }

    @Test
    public void shouldGetStoreLevelTagMapForBuiltInMetricsLatestVersion() {
        shouldGetStoreLevelTagMap(VERSION);
    }

    @Test
    public void shouldGetStoreLevelTagMapForBuiltInMetricsVersion0100To24() {
        shouldGetStoreLevelTagMap("0.10.0-2.4");
    }

    private void shouldGetStoreLevelTagMap(String str) {
        Map storeLevelTagMap = new StreamsMetricsImpl(this.metrics, THREAD_ID, str).storeLevelTagMap(THREAD_ID, TASK_ID, "remote-window", "window-keeper");
        MatcherAssert.assertThat(Integer.valueOf(storeLevelTagMap.size()), Matchers.equalTo(3));
        MatcherAssert.assertThat(storeLevelTagMap.get(str.equals(VERSION) ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_24), Matchers.equalTo(THREAD_ID));
        MatcherAssert.assertThat(storeLevelTagMap.get(TASK_ID_TAG), Matchers.equalTo(TASK_ID));
        MatcherAssert.assertThat(storeLevelTagMap.get("remote-window-state-id"), Matchers.equalTo("window-keeper"));
    }

    @Test
    public void shouldGetCacheLevelTagMapForBuiltInMetricsLatestVersion() {
        shouldGetCacheLevelTagMap(VERSION);
    }

    @Test
    public void shouldGetCacheLevelTagMapForBuiltInMetricsVersion0100To24() {
        shouldGetCacheLevelTagMap("0.10.0-2.4");
    }

    private void shouldGetCacheLevelTagMap(String str) {
        Map cacheLevelTagMap = new StreamsMetricsImpl(this.metrics, THREAD_ID, str).cacheLevelTagMap(THREAD_ID, "taskName", "storeName");
        MatcherAssert.assertThat(Integer.valueOf(cacheLevelTagMap.size()), Matchers.equalTo(3));
        MatcherAssert.assertThat(cacheLevelTagMap.get(str.equals(VERSION) ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_24), Matchers.equalTo(THREAD_ID));
        MatcherAssert.assertThat(cacheLevelTagMap.get(TASK_ID_TAG), Matchers.equalTo("taskName"));
        MatcherAssert.assertThat(cacheLevelTagMap.get(RECORD_CACHE_ID_TAG), Matchers.equalTo("storeName"));
    }

    @Test
    public void shouldGetThreadLevelTagMapForBuiltInMetricsLatestVersion() {
        shouldGetThreadLevelTagMap(VERSION);
    }

    @Test
    public void shouldGetThreadLevelTagMapForBuiltInMetricsVersion0100To24() {
        shouldGetThreadLevelTagMap("0.10.0-2.4");
    }

    private void shouldGetThreadLevelTagMap(String str) {
        Map threadLevelTagMap = new StreamsMetricsImpl(this.metrics, THREAD_ID, str).threadLevelTagMap(THREAD_ID);
        MatcherAssert.assertThat(Integer.valueOf(threadLevelTagMap.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(threadLevelTagMap.get(str.equals(VERSION) ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_24), Matchers.equalTo(THREAD_ID));
    }

    @Test
    public void shouldAddInvocationRateToSensor() {
        Sensor sensor = (Sensor) PowerMock.createMock(Sensor.class);
        EasyMock.expect(Boolean.valueOf(sensor.add((MetricName) EasyMock.eq(new MetricName("test-metric1-rate", "group", "description number one", this.tags)), (MeasurableStat) EasyMock.anyObject(Rate.class)))).andReturn(true);
        EasyMock.replay(new Object[]{sensor});
        StreamsMetricsImpl.addInvocationRateToSensor(sensor, "group", this.tags, METRIC_NAME1, "description number one");
        EasyMock.verify(new Object[]{sensor});
    }

    @Test
    public void shouldAddAmountRateAndSum() {
        StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(this.sensor, "group", this.tags, "metric", "description number one", "description number two");
        verifyMetric("metric-rate", "description number one", 18.0d, 72.0d, 90.0d / Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds());
        verifyMetric("metric-total", "description number two", 18.0d, 72.0d, 180.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldAddSum() {
        StreamsMetricsImpl.addSumMetricToSensor(this.sensor, "group", this.tags, "metric", "description number one");
        verifyMetric("metric-total", "description number one", 18.0d, 42.0d, 60.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddAmountRate() {
        StreamsMetricsImpl.addRateOfSumMetricToSensor(this.sensor, "group", this.tags, "metric", "description number one");
        verifyMetric("metric-rate", "description number one", 18.0d, 72.0d, 90.0d / Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds());
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddValue() {
        StreamsMetricsImpl.addValueMetricToSensor(this.sensor, "group", this.tags, "metric", "description number one");
        KafkaMetric metric = this.metrics.metric(new MetricName("metric", "group", "description number one", this.tags));
        MatcherAssert.assertThat(metric, CoreMatchers.is(CoreMatchers.notNullValue()));
        MetricConfig metricConfig = new MetricConfig();
        this.sensor.record(42.0d);
        MatcherAssert.assertThat(Double.valueOf(metric.measurable().measure(metricConfig, this.time.milliseconds())), Matchers.equalTo(Double.valueOf(42.0d)));
        this.sensor.record(18.0d);
        MatcherAssert.assertThat(Double.valueOf(metric.measurable().measure(metricConfig, this.time.milliseconds())), Matchers.equalTo(Double.valueOf(18.0d)));
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddAvgAndTotalMetricsToSensor() {
        StreamsMetricsImpl.addAvgAndSumMetricsToSensor(this.sensor, "group", this.tags, "metric", "description number one", "description number two");
        verifyMetric("metric-avg", "description number one", 18.0d, 42.0d, 30.0d);
        verifyMetric("metric-total", "description number two", 18.0d, 42.0d, 120.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldAddAvgAndMinAndMaxMetricsToSensor() {
        StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(this.sensor, "group", this.tags, "metric", "description number one", "description number two", "description number three");
        verifyMetric("metric-avg", "description number one", 18.0d, 42.0d, 30.0d);
        verifyMetric("metric-min", "description number two", 18.0d, 42.0d, 18.0d);
        verifyMetric("metric-max", "description number three", 18.0d, 42.0d, 42.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(4));
    }

    @Test
    public void shouldAddMinAndMaxMetricsToSensor() {
        StreamsMetricsImpl.addMinAndMaxToSensor(this.sensor, "group", this.tags, "metric", "description number one", "description number two");
        verifyMetric("metric-min", "description number one", 18.0d, 42.0d, 18.0d);
        verifyMetric("metric-max", "description number two", 18.0d, 42.0d, 42.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldReturnMetricsVersionCurrent() {
        MatcherAssert.assertThat(new StreamsMetricsImpl(this.metrics, THREAD_ID, VERSION).version(), Matchers.equalTo(StreamsMetricsImpl.Version.LATEST));
    }

    @Test
    public void shouldReturnMetricsVersionFrom100To23() {
        MatcherAssert.assertThat(new StreamsMetricsImpl(this.metrics, THREAD_ID, "0.10.0-2.4").version(), Matchers.equalTo(StreamsMetricsImpl.Version.FROM_0100_TO_24));
    }

    private void verifyMetric(String str, String str2, double d, double d2, double d3) {
        KafkaMetric metric = this.metrics.metric(new MetricName(str, "group", str2, this.tags));
        MatcherAssert.assertThat(metric, CoreMatchers.is(CoreMatchers.notNullValue()));
        MatcherAssert.assertThat(metric.metricName().description(), Matchers.equalTo(str2));
        this.sensor.record(d, this.time.milliseconds());
        this.sensor.record(d2, this.time.milliseconds());
        MatcherAssert.assertThat(Double.valueOf(metric.measurable().measure(new MetricConfig(), this.time.milliseconds())), Matchers.equalTo(Double.valueOf(d3)));
    }

    private void verifyMetricWithinError(String str, String str2, double d, double d2, double d3, double d4) {
        KafkaMetric metric = this.metrics.metric(new MetricName(str, "group", str2, this.tags));
        MatcherAssert.assertThat(metric, CoreMatchers.is(CoreMatchers.notNullValue()));
        MatcherAssert.assertThat(metric.metricName().description(), Matchers.equalTo(str2));
        this.sensor.record(d, this.time.milliseconds());
        this.sensor.record(d2, this.time.milliseconds());
        Assert.assertEquals(d3, metric.measurable().measure(new MetricConfig(), this.time.milliseconds()), 1.0d);
    }

    @Test
    public void shouldMeasureLatency() {
        Sensor sensor = (Sensor) PowerMock.createMock(Sensor.class);
        EasyMock.expect(Boolean.valueOf(sensor.shouldRecord())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(sensor.hasMetrics())).andReturn(true);
        sensor.record(4.0d);
        Time time = (Time) EasyMock.mock(Time.class);
        EasyMock.expect(Long.valueOf(time.nanoseconds())).andReturn(6L);
        EasyMock.expect(Long.valueOf(time.nanoseconds())).andReturn(10L);
        EasyMock.replay(new Object[]{sensor, time});
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
        }, time, sensor);
        EasyMock.verify(new Object[]{sensor, time});
    }

    @Test
    public void shouldNotMeasureLatencyDueToRecordingLevel() {
        Sensor sensor = (Sensor) PowerMock.createMock(Sensor.class);
        EasyMock.expect(Boolean.valueOf(sensor.shouldRecord())).andReturn(false);
        Time time = (Time) EasyMock.mock(Time.class);
        EasyMock.replay(new Object[]{sensor});
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
        }, time, sensor);
        EasyMock.verify(new Object[]{sensor});
    }

    @Test
    public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
        Sensor sensor = (Sensor) PowerMock.createMock(Sensor.class);
        EasyMock.expect(Boolean.valueOf(sensor.shouldRecord())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(sensor.hasMetrics())).andReturn(false);
        Time time = (Time) EasyMock.mock(Time.class);
        EasyMock.replay(new Object[]{sensor});
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
        }, time, sensor);
        EasyMock.verify(new Object[]{sensor});
    }
}
