package org.apache.kafka.streams.processor.internals.metrics;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;

@PrepareForTest({StreamsMetricsImpl.class, Sensor.class})
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.class */
public class ProcessorNodeMetricsTest {
    private static final String THREAD_ID = "test-thread";
    private static final String TASK_ID = "test-task";
    private static final String PROCESSOR_NODE_ID = "test-processor";
    private final Sensor expectedSensor = (Sensor) EasyMock.mock(Sensor.class);
    private final Sensor expectedParentSensor = (Sensor) EasyMock.mock(Sensor.class);
    private final StreamsMetricsImpl streamsMetrics = (StreamsMetricsImpl) PowerMock.createMock(StreamsMetricsImpl.class);
    private final Map<String, String> tagMap = Collections.singletonMap("hello", "world");
    private final Map<String, String> parentTagMap = Collections.singletonMap("hi", "universe");

    @Parameterized.Parameter
    public StreamsMetricsImpl.Version builtInMetricsVersion;

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{StreamsMetricsImpl.Version.LATEST}, new Object[]{StreamsMetricsImpl.Version.FROM_0100_TO_24});
    }

    @Before
    public void setUp() {
        EasyMock.expect(this.streamsMetrics.version()).andReturn(this.builtInMetricsVersion).anyTimes();
        PowerMock.mockStatic(StreamsMetricsImpl.class);
    }

    @Test
    public void shouldGetSuppressionEmitSensor() {
        EasyMock.expect(this.streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, "suppression-emit", Sensor.RecordingLevel.DEBUG, new Sensor[0])).andReturn(this.expectedSensor);
        EasyMock.expect(this.streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(this.tagMap);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expectedSensor, "stream-processor-node-metrics", this.tagMap, "suppression-emit", "The average number of emitted records from the suppression buffer per second", "The total number of emitted records from the suppression buffer");
        verifySensor(() -> {
            return ProcessorNodeMetrics.suppressionEmitSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetIdempotentUpdateSkipSensor() {
        EasyMock.expect(this.streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, "idempotent-update-skip", Sensor.RecordingLevel.DEBUG, new Sensor[0])).andReturn(this.expectedSensor);
        EasyMock.expect(this.streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(this.tagMap);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expectedSensor, "stream-processor-node-metrics", this.tagMap, "idempotent-update-skip", "The average number of skipped idempotent updates per second", "The total number of skipped idempotent updates");
        verifySensor(() -> {
            return ProcessorNodeMetrics.skippedIdempotentUpdatesSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetProcessSensor() {
        shouldGetThroughputAndLatencySensorWithParentOrEmptySensor("process", "The average number of calls to process per second", "The total number of calls to process", "The average latency of calls to process", "The maximum latency of calls to process", () -> {
            return ProcessorNodeMetrics.processSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetProcessAtSourceSensor() {
        EasyMock.expect(this.streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, "process", Sensor.RecordingLevel.DEBUG, new Sensor[0])).andReturn(this.expectedParentSensor);
        EasyMock.expect(this.streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).andReturn(this.parentTagMap);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expectedParentSensor, "stream-task-metrics", this.parentTagMap, "process", "The average number of calls to process per second", "The total number of calls to process");
        setUpThroughputSensor("process", "The average number of calls to process per second", "The total number of calls to process", Sensor.RecordingLevel.DEBUG, this.expectedParentSensor);
        verifySensor(() -> {
            return ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetPunctuateSensor() {
        shouldGetThroughputAndLatencySensorWithParentOrEmptySensor("punctuate", "The average number of calls to punctuate per second", "The total number of calls to punctuate", "The average latency of calls to punctuate", "The maximum latency of calls to punctuate", () -> {
            return ProcessorNodeMetrics.punctuateSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetCreateSensor() {
        shouldGetThroughputAndLatencySensorWithParentOrEmptySensor("create", "The average number of processor nodes created per second", "The total number of processor nodes created", "The average latency of creations of processor nodes", "The maximum latency of creations of processor nodes", () -> {
            return ProcessorNodeMetrics.createSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetDestroySensor() {
        shouldGetThroughputAndLatencySensorWithParentOrEmptySensor("destroy", "The average number of destructions of processor nodes per second", "The total number of destructions of processor nodes", "The average latency of destructions of processor nodes", "The maximum latency of destructions of processor nodes", () -> {
            return ProcessorNodeMetrics.destroySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetForwardSensor() {
        setUpThroughputParentSensor("forward", "The average number of calls to forward per second", "The total number of calls to forward");
        setUpThroughputSensor("forward", "The average number of calls to forward per second", "The total number of calls to forward", Sensor.RecordingLevel.DEBUG, this.expectedParentSensor);
        verifySensor(() -> {
            return ProcessorNodeMetrics.forwardSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetLateRecordDropSensor() {
        setUpThroughputSensor("late-record-drop", "The average number of dropped late records per second", "The total number of dropped late records", Sensor.RecordingLevel.INFO, new Sensor[0]);
        verifySensor(() -> {
            return ProcessorNodeMetrics.lateRecordDropSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, this.streamsMetrics);
        });
    }

    @Test
    public void shouldGetProcessAtSourceSensorOrForwardSensor() {
        if (this.builtInMetricsVersion == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            shouldGetForwardSensor();
        } else {
            shouldGetProcessAtSourceSensor();
        }
    }

    private void shouldGetThroughputAndLatencySensorWithParentOrEmptySensor(String str, String str2, String str3, String str4, String str5, Supplier<Sensor> supplier) {
        if (this.builtInMetricsVersion == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            setUpThroughputAndLatencySensorWithParent(str, str2, str3, str4, str5);
        } else {
            EasyMock.expect(this.streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, str, Sensor.RecordingLevel.DEBUG, new Sensor[0])).andReturn(this.expectedSensor);
        }
        verifySensor(supplier);
    }

    private void setUpThroughputAndLatencySensorWithParent(String str, String str2, String str3, String str4, String str5) {
        setUpThroughputAndLatencyParentSensor(str, str2, str3, str4, str5);
        setUpThroughputAndLatencySensor(str, str2, str3, str4, str5, this.expectedParentSensor);
    }

    private void setUpThroughputAndLatencyParentSensor(String str, String str2, String str3, String str4, String str5) {
        EasyMock.expect(this.streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, str, Sensor.RecordingLevel.DEBUG, new Sensor[0])).andReturn(this.expectedParentSensor);
        EasyMock.expect(this.streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, "all")).andReturn(this.parentTagMap);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expectedParentSensor, "stream-processor-node-metrics", this.parentTagMap, str, str2, str3);
        StreamsMetricsImpl.addAvgAndMaxToSensor(this.expectedParentSensor, "stream-processor-node-metrics", this.parentTagMap, str + "-latency", str4, str5);
    }

    private void setUpThroughputParentSensor(String str, String str2, String str3) {
        EasyMock.expect(this.streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, str, Sensor.RecordingLevel.DEBUG, new Sensor[0])).andReturn(this.expectedParentSensor);
        EasyMock.expect(this.streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, "all")).andReturn(this.parentTagMap);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expectedParentSensor, "stream-processor-node-metrics", this.parentTagMap, str, str2, str3);
    }

    private void setUpThroughputAndLatencySensor(String str, String str2, String str3, String str4, String str5, Sensor... sensorArr) {
        setUpThroughputSensor(str, str2, str3, Sensor.RecordingLevel.DEBUG, sensorArr);
        StreamsMetricsImpl.addAvgAndMaxToSensor(this.expectedSensor, "stream-processor-node-metrics", this.tagMap, str + "-latency", str4, str5);
    }

    private void setUpThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        EasyMock.expect(this.streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, str, recordingLevel, sensorArr)).andReturn(this.expectedSensor);
        EasyMock.expect(this.streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(this.tagMap);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expectedSensor, "stream-processor-node-metrics", this.tagMap, str, str2, str3);
    }

    private void verifySensor(Supplier<Sensor> supplier) {
        PowerMock.replay(new Object[]{StreamsMetricsImpl.class, this.streamsMetrics});
        Sensor sensor = supplier.get();
        PowerMock.verify(new Object[]{StreamsMetricsImpl.class, this.streamsMetrics});
        MatcherAssert.assertThat(sensor, CoreMatchers.is(this.expectedSensor));
    }
}
