package dev.responsive.kafka.internal.metrics;

import dev.responsive.kafka.internal.utils.Utils;
import java.io.Closeable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/metrics/ResponsiveRestoreListener.class */
public class ResponsiveRestoreListener implements StateRestoreListener, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveRestoreListener.class);
    private final ResponsiveMetrics metrics;
    private final MetricName numRestoringMetricName;
    private final Sensor numInterruptedSensor;
    private final ConcurrentMap<MetricName, Long> storeMetricToStartMs = new ConcurrentHashMap();
    private StateRestoreListener userRestoreListener;

    private MetricName timeRestoringMetricName(TopicPartition topicPartition, String str) {
        return this.metrics.metricName(StoreMetrics.TIME_RESTORING, StoreMetrics.TIME_RESTORING_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, Utils.extractThreadIdFromThreadName(Thread.currentThread().getName()), topicPartition, str));
    }

    public ResponsiveRestoreListener(ResponsiveMetrics responsiveMetrics) {
        this.metrics = responsiveMetrics;
        this.numRestoringMetricName = responsiveMetrics.metricName(ApplicationMetrics.NUM_RESTORING_CHANGELOGS, ApplicationMetrics.NUM_RESTORING_CHANGELOGS_DESCRIPTION, responsiveMetrics.applicationLevelMetric(ApplicationMetrics.APPLICATION_METRIC_GROUP));
        responsiveMetrics.addMetric(this.numRestoringMetricName, (metricConfig, j) -> {
            return Integer.valueOf(this.storeMetricToStartMs.size());
        });
        this.numInterruptedSensor = responsiveMetrics.addSensor(ApplicationMetrics.NUM_INTERRUPTED_CHANGELOGS);
        this.numInterruptedSensor.add(responsiveMetrics.metricName(ApplicationMetrics.NUM_INTERRUPTED_CHANGELOGS, ApplicationMetrics.NUM_INTERRUPTED_CHANGELOGS_DESCRIPTION, responsiveMetrics.applicationLevelMetric(ApplicationMetrics.APPLICATION_METRIC_GROUP)), new CumulativeCount());
    }

    public void registerUserRestoreListener(StateRestoreListener stateRestoreListener) {
        this.userRestoreListener = stateRestoreListener;
    }

    public StateRestoreListener userRestoreListener() {
        return this.userRestoreListener;
    }

    public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis();
        MetricName timeRestoringMetricName = timeRestoringMetricName(topicPartition, str);
        this.storeMetricToStartMs.put(timeRestoringMetricName, Long.valueOf(currentTimeMillis));
        this.metrics.addMetric(timeRestoringMetricName, (metricConfig, j3) -> {
            return Long.valueOf(j3 - currentTimeMillis);
        });
        LOG.info("Beginning restoration from offset {} to {} at {}ms (epoch time) for partition {} of state store {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(currentTimeMillis), topicPartition, str});
        if (this.userRestoreListener != null) {
            this.userRestoreListener.onRestoreStart(topicPartition, str, j, j2);
        }
    }

    public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
        LOG.debug("Restored {} more records up to offset {} for partition {} of state store {}", new Object[]{Long.valueOf(j2), Long.valueOf(j), topicPartition, str});
        if (this.userRestoreListener != null) {
            this.userRestoreListener.onBatchRestored(topicPartition, str, j, j2);
        }
    }

    public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
        LOG.info("Finished restoration of {} total records after {}ms for partition {} of state store {}", new Object[]{Long.valueOf(j), Long.valueOf(removeStateStore(timeRestoringMetricName(topicPartition, str), System.currentTimeMillis())), topicPartition, str});
        if (this.userRestoreListener != null) {
            this.userRestoreListener.onRestoreEnd(topicPartition, str, j);
        }
    }

    public void onRestoreSuspended(TopicPartition topicPartition, String str, long j) {
        MetricName timeRestoringMetricName = timeRestoringMetricName(topicPartition, str);
        long currentTimeMillis = System.currentTimeMillis();
        long removeStateStore = removeStateStore(timeRestoringMetricName, currentTimeMillis);
        this.numInterruptedSensor.record(1.0d, currentTimeMillis);
        LOG.info("Suspended restoration of {} total records after {}ms for partition {} of state store {}", new Object[]{Long.valueOf(j), Long.valueOf(removeStateStore), topicPartition, str});
    }

    public void onStoreClosed(TopicPartition topicPartition, String str) {
        if (this.storeMetricToStartMs.containsKey(timeRestoringMetricName(topicPartition, str))) {
            onRestoreSuspended(topicPartition, str, -1L);
        }
    }

    private long removeStateStore(MetricName metricName, long j) {
        this.metrics.removeMetric(metricName);
        return j - this.storeMetricToStartMs.remove(metricName).longValue();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (!this.storeMetricToStartMs.isEmpty()) {
            LOG.warn("Not all state stores being restored were closed before ending shutdown");
        }
        this.metrics.removeMetric(this.numRestoringMetricName);
        this.metrics.removeSensor(ApplicationMetrics.NUM_INTERRUPTED_CHANGELOGS);
    }
}
