/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals.metrics;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class RocksDBBlockCacheMetricsTest {
    private static final String STORE_NAME = "test";
    private static final String METRICS_SCOPE = "test-scope";
    private static TaskId taskId = new TaskId(0, 0);

    public static Stream<Arguments> stores() {
        File stateDir = TestUtils.tempDirectory((String)"state");
        return Stream.of(Arguments.of((Object[])new Object[]{new RocksDBStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), taskId, stateDir)}), Arguments.of((Object[])new Object[]{new RocksDBTimestampedStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), taskId, stateDir)}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void withStore(RocksDBStore store, StateStoreContext context, Runnable function) {
        store.init(context, (StateStore)store);
        try {
            function.run();
        }
        finally {
            store.close();
            try {
                Utils.delete((File)context.stateDir());
            }
            catch (IOException iOException) {}
        }
    }

    @ParameterizedTest
    @MethodSource(value={"stores"})
    public void shouldRecordCorrectBlockCacheCapacity(RocksDBStore store, StateStoreContext ctx) {
        RocksDBBlockCacheMetricsTest.withStore(store, ctx, () -> this.assertMetric(ctx, "stream-state-metrics", "block-cache-capacity", BigInteger.valueOf(0x3200000L)));
    }

    @ParameterizedTest
    @MethodSource(value={"stores"})
    public void shouldRecordCorrectBlockCacheUsage(RocksDBStore store, StateStoreContext ctx) {
        RocksDBBlockCacheMetricsTest.withStore(store, ctx, () -> {
            BlockBasedTableConfigWithAccessibleCache tableFormatConfig = (BlockBasedTableConfigWithAccessibleCache)store.getOptions().tableFormatConfig();
            long usage = tableFormatConfig.blockCache().getUsage();
            this.assertMetric(ctx, "stream-state-metrics", "block-cache-usage", BigInteger.valueOf(usage));
        });
    }

    @ParameterizedTest
    @MethodSource(value={"stores"})
    public void shouldRecordCorrectBlockCachePinnedUsage(RocksDBStore store, StateStoreContext ctx) {
        RocksDBBlockCacheMetricsTest.withStore(store, ctx, () -> {
            BlockBasedTableConfigWithAccessibleCache tableFormatConfig = (BlockBasedTableConfigWithAccessibleCache)store.getOptions().tableFormatConfig();
            long usage = tableFormatConfig.blockCache().getPinnedUsage();
            this.assertMetric(ctx, "stream-state-metrics", "block-cache-pinned-usage", BigInteger.valueOf(usage));
        });
    }

    public <T> void assertMetric(StateStoreContext context, String group, String metricName, T expected) {
        StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl((StateStoreContext)context);
        MetricName name = metrics.metricsRegistry().metricName(metricName, group, "Ignored", this.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, STORE_NAME));
        KafkaMetric metric = (KafkaMetric)metrics.metrics().get(name);
        Assertions.assertEquals(expected, (Object)metric.metricValue(), (String)String.format("Value for metric '%s-%s' was incorrect", group, metricName));
    }

    public Map<String, String> threadLevelTagMap(String threadId) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put("thread-id", threadId);
        return tagMap;
    }

    public Map<String, String> taskLevelTagMap(String threadId, String taskId) {
        Map<String, String> tagMap = this.threadLevelTagMap(threadId);
        tagMap.put("task-id", taskId);
        return tagMap;
    }

    public Map<String, String> storeLevelTagMap(String taskName, String storeType, String storeName) {
        Map<String, String> tagMap = this.taskLevelTagMap(Thread.currentThread().getName(), taskName);
        tagMap.put(storeType + "-" + "state-id", storeName);
        return tagMap;
    }
}

