package dev.responsive.kafka.api.async.internals.metrics;

import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.streams.processor.TaskId;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/metrics/AsyncProcessorMetricsRecorder.class */
public class AsyncProcessorMetricsRecorder {
    public static final String GROUP_NAME = "async-processor-metrics";
    public static final String PENDING_EVENTS = "pending-events";
    public static final String PENDING_EVENTS_DESC = "The current number of events pending in processor";
    public static final String PROCESS_TIME = "event-process-duration-ns";
    public static final String PROCESS_TIME_MAX = "event-process-duration-ns-max";
    public static final String PROCESS_TIME_AVG = "event-process-duration-ns-avg";
    public static final String PROCESS_TIME_DESC = "time to process a record";
    public static final String PROCESS_TIME_MAX_DESC = "The maximum time to process a record";
    public static final String PROCESS_TIME_AVG_DESC = "The average time to process a record";
    public static final String TRANSITION_TIME = "event-transition-duration-ns";
    public static final String TRANSITION_TIME_MAX = "event-transition-duration-ns-max";
    public static final String TRANSITION_TIME_AVG = "event-transition-duration-ns-avg";
    public static final String TRANSITION_TIME_DESC = "time to transition between states";
    public static final String TRANSITION_TIME_MAX_DESC = "The maximum time to transition between states";
    public static final String TRANSITION_TIME_AVG_DESC = "The average time to transition between states";
    public static final String SCHEDULING_QUEUE_SIZE = "scheduling-queue-size";
    public static final String SCHEDULING_QUEUE_SIZE_DESC = "entries in the scheduling queue";
    public static final String SCHEDULING_QUEUE_LONGEST_SIZE = "scheduling-queue-longest-size";
    public static final String SCHEDULING_QUEUE_LONGEST_SIZE_DESC = "entries in the queue for key w/ most entries in scheduling queue";
    public static final String FROM_STATE = "from-state";
    public static final String TO_STATE = "to-state";
    private final ResponsiveMetrics.MetricScope scope;
    private final ResponsiveMetrics metrics;
    private final MetricName pendingEventsGauge;
    private final Sensor eventProcessSensor;
    private final Sensor schedulingQueueSizeSensor;
    private final Sensor schedulingQueueLongestQueueSizeSensor;
    private final ConcurrentMap<String, Sensor> stateTransitionSensors = new ConcurrentHashMap();

    public AsyncProcessorMetricsRecorder(String str, TaskId taskId, String str2, ResponsiveMetrics responsiveMetrics, Supplier<Integer> supplier) {
        this.metrics = responsiveMetrics;
        this.scope = responsiveMetrics.processorLevelMetric(GROUP_NAME, str, taskId, str2);
        this.pendingEventsGauge = responsiveMetrics.metricName(PENDING_EVENTS, PENDING_EVENTS_DESC, this.scope);
        responsiveMetrics.addMetric(this.pendingEventsGauge, (metricConfig, j) -> {
            return (Integer) supplier.get();
        });
        this.eventProcessSensor = responsiveMetrics.addSensor(this.scope.sensorName(PROCESS_TIME));
        this.eventProcessSensor.add(responsiveMetrics.metricName(PROCESS_TIME_AVG, PROCESS_TIME_AVG_DESC, this.scope), new Avg());
        this.eventProcessSensor.add(responsiveMetrics.metricName(PROCESS_TIME_MAX, PROCESS_TIME_MAX_DESC, this.scope), new Max());
        this.schedulingQueueSizeSensor = responsiveMetrics.addSensor(this.scope.sensorName(SCHEDULING_QUEUE_SIZE));
        this.schedulingQueueSizeSensor.add(responsiveMetrics.metricName(SCHEDULING_QUEUE_SIZE, SCHEDULING_QUEUE_SIZE_DESC, this.scope), new Value());
        this.schedulingQueueLongestQueueSizeSensor = responsiveMetrics.addSensor(this.scope.sensorName(SCHEDULING_QUEUE_LONGEST_SIZE));
        this.schedulingQueueLongestQueueSizeSensor.add(responsiveMetrics.metricName(SCHEDULING_QUEUE_LONGEST_SIZE, SCHEDULING_QUEUE_LONGEST_SIZE_DESC, this.scope), new Value());
    }

    public void recordEventProcess(long j) {
        this.eventProcessSensor.record(j);
    }

    public void recordSchedulingQueueSize(int i) {
        this.schedulingQueueSizeSensor.record(i);
    }

    public void recordSchedulingQueueLongestSize(int i) {
        this.schedulingQueueLongestQueueSizeSensor.record(i);
    }

    public void recordStateTransition(AsyncEvent.State state, long j, AsyncEvent.State state2, long j2) {
        ResponsiveMetrics.MetricScope withTags = this.scope.withTags(FROM_STATE, state.name()).withTags(TO_STATE, state2.name());
        this.stateTransitionSensors.computeIfAbsent(withTags.sensorName(TRANSITION_TIME), str -> {
            Sensor addSensor = this.metrics.addSensor(str);
            addSensor.add(this.metrics.metricName(TRANSITION_TIME_AVG, TRANSITION_TIME_AVG_DESC, withTags), new Avg());
            addSensor.add(this.metrics.metricName(TRANSITION_TIME_MAX, TRANSITION_TIME_MAX_DESC, withTags), new Max());
            return addSensor;
        }).record(j2 - j);
    }

    public void close() {
        this.metrics.removeMetric(this.pendingEventsGauge);
        this.metrics.removeSensor(this.eventProcessSensor.name());
        Set<String> keySet = this.stateTransitionSensors.keySet();
        ResponsiveMetrics responsiveMetrics = this.metrics;
        Objects.requireNonNull(responsiveMetrics);
        keySet.forEach(responsiveMetrics::removeSensor);
    }
}
