package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.class */
public class RocksDBMetricsIntegrationTest {
    private static final String STREAM_INPUT_ONE = "STREAM_INPUT_ONE";
    private static final String STREAM_OUTPUT_ONE = "STREAM_OUTPUT_ONE";
    private static final String STREAM_INPUT_TWO = "STREAM_INPUT_TWO";
    private static final String STREAM_OUTPUT_TWO = "STREAM_OUTPUT_TWO";
    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
    private static final long TIMEOUT = 60000;
    private static final String METRICS_GROUP = "stream-state-metrics";
    private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
    private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
    private static final String BYTES_READ_RATE = "bytes-read-rate";
    private static final String BYTES_READ_TOTAL = "bytes-read-total";
    private static final String MEMTABLE_BYTES_FLUSHED_RATE = "memtable-bytes-flushed-rate";
    private static final String MEMTABLE_BYTES_FLUSHED_TOTAL = "memtable-bytes-flushed-total";
    private static final String MEMTABLE_HIT_RATIO = "memtable-hit-ratio";
    private static final String WRITE_STALL_DURATION_AVG = "write-stall-duration-avg";
    private static final String WRITE_STALL_DURATION_TOTAL = "write-stall-duration-total";
    private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit-ratio";
    private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit-ratio";
    private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit-ratio";
    private static final String BYTES_READ_DURING_COMPACTION_RATE = "bytes-read-compaction-rate";
    private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
    private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
    private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
    private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
    private static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
    private static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
    private static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
    private static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
    private static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
    private static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
    private static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
    private static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
    private static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
    private static final String COMPACTION_PENDING = "compaction-pending";
    private static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
    private static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
    private static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
    private static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
    private static final String NUMBER_OF_LIVE_VERSIONS = "num-live-versions";
    private static final String CAPACITY_OF_BLOCK_CACHE = "block-cache-capacity";
    private static final String USAGE_OF_BLOCK_CACHE = "block-cache-usage";
    private static final String PINNED_USAGE_OF_BLOCK_CACHE = "block-cache-pinned-usage";
    private static final String ESTIMATED_NUMBER_OF_KEYS = "estimate-num-keys";
    private static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
    private static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";

    @Parameterized.Parameter
    public String processingGuarantee;

    @Rule
    public TestName testName = new TestName();
    private static final int NUM_BROKERS = 3;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final Duration WINDOW_SIZE = Duration.ofMillis(50);

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest$MetricsVerifier.class */
    public interface MetricsVerifier {
        void verify(KafkaStreams kafkaStreams, String str) throws Exception;
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"at_least_once"}, new Object[]{"exactly_once"}, new Object[]{"exactly_once_beta"});
    }

    @Before
    public void before() throws Exception {
        CLUSTER.createTopic(STREAM_INPUT_ONE, 1, NUM_BROKERS);
        CLUSTER.createTopic(STREAM_INPUT_TWO, 1, NUM_BROKERS);
    }

    @After
    public void after() throws Exception {
        CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
    }

    @Test
    public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
        Properties streamsConfig = streamsConfig();
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
        StreamsBuilder builderForStateStores = builderForStateStores();
        cleanUpStateRunVerifyAndClose(builderForStateStores, streamsConfig, this::verifyThatRocksDBMetricsAreExposed);
        cleanUpStateRunVerifyAndClose(builderForStateStores, streamsConfig, this::verifyThatRocksDBMetricsAreExposed);
    }

    private Properties streamsConfig() {
        Properties properties = new Properties();
        properties.put("application.id", "test-application-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        properties.put("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name);
        properties.put("processing.guarantee", this.processingGuarantee);
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("cache.max.bytes.buffering", 0);
        return properties;
    }

    private StreamsBuilder builderForStateStores() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(STREAM_INPUT_ONE, Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_ONE);
        streamsBuilder.stream(STREAM_INPUT_TWO, Consumed.with(Serdes.Integer(), Serdes.String())).groupByKey().windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO)).aggregate(() -> {
            return 0L;
        }, (num, str, l) -> {
            return l;
        }, Materialized.as("time-windowed-aggregated-stream-store").withValueSerde(Serdes.Long()).withRetention(WINDOW_SIZE)).toStream().map((windowed, l2) -> {
            return KeyValue.pair(l2, l2);
        }).to(STREAM_OUTPUT_TWO, Produced.with(Serdes.Long(), Serdes.Long()));
        return streamsBuilder;
    }

    private void cleanUpStateRunVerifyAndClose(StreamsBuilder streamsBuilder, Properties properties, MetricsVerifier metricsVerifier) throws Exception {
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.cleanUp();
        produceRecords();
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, 60000L);
        metricsVerifier.verify(kafkaStreams, "rocksdb-state-id");
        metricsVerifier.verify(kafkaStreams, "rocksdb-window-state-id");
        kafkaStreams.close();
    }

    private void produceRecords() {
        MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT_ONE, Utils.mkSet(new KeyValue[]{new KeyValue(1, "A"), new KeyValue(1, "B"), new KeyValue(1, "C")}), producerConfig, Long.valueOf(mockTime.milliseconds()));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT_TWO, Collections.singleton(new KeyValue(1, "A")), producerConfig, Long.valueOf(mockTime.milliseconds()));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT_TWO, Collections.singleton(new KeyValue(1, "B")), producerConfig, Long.valueOf(mockTime.milliseconds()));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT_TWO, Collections.singleton(new KeyValue(1, "C")), producerConfig, Long.valueOf(mockTime.milliseconds()));
    }

    private void verifyThatRocksDBMetricsAreExposed(KafkaStreams kafkaStreams, String str) {
        List<Metric> rocksDBMetrics = getRocksDBMetrics(kafkaStreams, str);
        checkMetricByName(rocksDBMetrics, BYTES_WRITTEN_RATE, 1);
        checkMetricByName(rocksDBMetrics, BYTES_WRITTEN_TOTAL, 1);
        checkMetricByName(rocksDBMetrics, BYTES_READ_RATE, 1);
        checkMetricByName(rocksDBMetrics, BYTES_READ_TOTAL, 1);
        checkMetricByName(rocksDBMetrics, MEMTABLE_BYTES_FLUSHED_RATE, 1);
        checkMetricByName(rocksDBMetrics, MEMTABLE_BYTES_FLUSHED_TOTAL, 1);
        checkMetricByName(rocksDBMetrics, MEMTABLE_HIT_RATIO, 1);
        checkMetricByName(rocksDBMetrics, WRITE_STALL_DURATION_AVG, 1);
        checkMetricByName(rocksDBMetrics, WRITE_STALL_DURATION_TOTAL, 1);
        checkMetricByName(rocksDBMetrics, BLOCK_CACHE_DATA_HIT_RATIO, 1);
        checkMetricByName(rocksDBMetrics, BLOCK_CACHE_INDEX_HIT_RATIO, 1);
        checkMetricByName(rocksDBMetrics, BLOCK_CACHE_FILTER_HIT_RATIO, 1);
        checkMetricByName(rocksDBMetrics, BYTES_READ_DURING_COMPACTION_RATE, 1);
        checkMetricByName(rocksDBMetrics, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_OPEN_FILES, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_FILE_ERRORS, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_DELETES_ACTIVE_MEMTABLE, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_IMMUTABLE_MEMTABLES, 1);
        checkMetricByName(rocksDBMetrics, CURRENT_SIZE_OF_ACTIVE_MEMTABLE, 1);
        checkMetricByName(rocksDBMetrics, CURRENT_SIZE_OF_ALL_MEMTABLES, 1);
        checkMetricByName(rocksDBMetrics, SIZE_OF_ALL_MEMTABLES, 1);
        checkMetricByName(rocksDBMetrics, MEMTABLE_FLUSH_PENDING, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_RUNNING_FLUSHES, 1);
        checkMetricByName(rocksDBMetrics, COMPACTION_PENDING, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_RUNNING_COMPACTIONS, 1);
        checkMetricByName(rocksDBMetrics, ESTIMATED_BYTES_OF_PENDING_COMPACTION, 1);
        checkMetricByName(rocksDBMetrics, TOTAL_SST_FILES_SIZE, 1);
        checkMetricByName(rocksDBMetrics, LIVE_SST_FILES_SIZE, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_LIVE_VERSIONS, 1);
        checkMetricByName(rocksDBMetrics, CAPACITY_OF_BLOCK_CACHE, 1);
        checkMetricByName(rocksDBMetrics, USAGE_OF_BLOCK_CACHE, 1);
        checkMetricByName(rocksDBMetrics, PINNED_USAGE_OF_BLOCK_CACHE, 1);
        checkMetricByName(rocksDBMetrics, ESTIMATED_NUMBER_OF_KEYS, 1);
        checkMetricByName(rocksDBMetrics, ESTIMATED_MEMORY_OF_TABLE_READERS, 1);
        checkMetricByName(rocksDBMetrics, NUMBER_OF_BACKGROUND_ERRORS, 1);
    }

    private void checkMetricByName(List<Metric> list, String str, int i) {
        List<Metric> list2 = (List) list.stream().filter(metric -> {
            return metric.metricName().name().equals(str);
        }).collect(Collectors.toList());
        MatcherAssert.assertThat("Size of metrics of type:'" + str + "' must be equal to " + i + " but it's equal to " + list2.size(), Integer.valueOf(list2.size()), Matchers.is(Integer.valueOf(i)));
        for (Metric metric2 : list2) {
            MatcherAssert.assertThat("Metric:'" + metric2.metricName() + "' must be not null", metric2.metricValue(), Matchers.is(Matchers.notNullValue()));
        }
    }

    private List<Metric> getRocksDBMetrics(KafkaStreams kafkaStreams, String str) {
        return (List) new ArrayList(kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(METRICS_GROUP) && metric.metricName().tags().containsKey(str);
        }).collect(Collectors.toList());
    }
}
