package org.apache.kafka.streams.state.internals;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.class */
public class MeteredTimestampedKeyValueStoreTest {

    @Mock(type = MockType.NICE)
    private KeyValueStore<Bytes, byte[]> inner;

    @Mock(type = MockType.NICE)
    private ProcessorContext context;
    private MeteredTimestampedKeyValueStore<String, String> metered;
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private final Map<String, String> tags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", this.taskId.toString()), Utils.mkEntry("scope-state-id", "metered")});
    private final String key = "key";
    private final Bytes keyBytes = Bytes.wrap("key".getBytes());
    private final String value = "value";
    private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97);
    private final byte[] valueAndTimestampBytes = "��������������avalue".getBytes();
    private final KeyValue<Bytes, byte[]> byteKeyValueTimestampPair = KeyValue.pair(this.keyBytes, this.valueAndTimestampBytes);
    private final Metrics metrics = new Metrics();

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest$CachedKeyValueStore.class */
    private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    }

    @Before
    public void before() {
        this.metered = new MeteredTimestampedKeyValueStore<>(this.inner, "scope", new MockTime(), Serdes.String(), new ValueAndTimestampSerde(Serdes.String()));
        this.metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
        EasyMock.expect(this.context.metrics()).andReturn(new MockStreamsMetrics(this.metrics));
        EasyMock.expect(this.context.taskId()).andReturn(this.taskId);
        EasyMock.expect(this.inner.name()).andReturn("metered").anyTimes();
    }

    private void init() {
        EasyMock.replay(new Object[]{this.inner, this.context});
        this.metered.init(this.context, this.metered);
    }

    @Test
    public void testMetrics() {
        init();
        JmxReporter jmxReporter = new JmxReporter("kafka.streams");
        this.metrics.addReporter(jmxReporter);
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,thread-id=%s,task-id=%s,%s-state-id=%s", "scope", this.threadId, this.taskId.toString(), "scope", "metered")));
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-%s-state-metrics,thread-id=%s,task-id=%s,%s-state-id=%s", "scope", this.threadId, this.taskId.toString(), "scope", "all")));
    }

    @Test
    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
        this.inner.put(EasyMock.eq(this.keyBytes), EasyMock.aryEq(this.valueAndTimestampBytes));
        EasyMock.expectLastCall();
        init();
        this.metered.put("key", this.valueAndTimestamp);
        Assert.assertTrue(((Double) metric("put-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
        EasyMock.expect(this.inner.get(this.keyBytes)).andReturn(this.valueAndTimestampBytes);
        init();
        MatcherAssert.assertThat(this.metered.get("key"), CoreMatchers.equalTo(this.valueAndTimestamp));
        Assert.assertTrue(((Double) metric("get-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
        EasyMock.expect(this.inner.putIfAbsent(EasyMock.eq(this.keyBytes), EasyMock.aryEq(this.valueAndTimestampBytes))).andReturn((Object) null);
        init();
        this.metered.putIfAbsent("key", this.valueAndTimestamp);
        Assert.assertTrue(((Double) metric("put-if-absent-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    private KafkaMetric metric(String str) {
        return this.metrics.metric(new MetricName(str, "stream-scope-state-metrics", "", this.tags));
    }

    @Test
    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
        this.inner.putAll((List) EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        init();
        this.metered.putAll(Collections.singletonList(KeyValue.pair("key", this.valueAndTimestamp)));
        Assert.assertTrue(((Double) metric("put-all-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
        EasyMock.expect(this.inner.delete(this.keyBytes)).andReturn(this.valueAndTimestampBytes);
        init();
        this.metered.delete("key");
        Assert.assertTrue(((Double) metric("delete-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
        EasyMock.expect(this.inner.range(this.keyBytes, this.keyBytes)).andReturn(new KeyValueIteratorStub(Collections.singletonList(this.byteKeyValueTimestampPair).iterator()));
        init();
        KeyValueIterator range = this.metered.range("key", "key");
        MatcherAssert.assertThat(((KeyValue) range.next()).value, CoreMatchers.equalTo(this.valueAndTimestamp));
        Assert.assertFalse(range.hasNext());
        range.close();
        Assert.assertTrue(((Double) metric("range-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
        EasyMock.expect(this.inner.all()).andReturn(new KeyValueIteratorStub(Collections.singletonList(this.byteKeyValueTimestampPair).iterator()));
        init();
        KeyValueIterator all = this.metered.all();
        MatcherAssert.assertThat(((KeyValue) all.next()).value, CoreMatchers.equalTo(this.valueAndTimestamp));
        Assert.assertFalse(all.hasNext());
        all.close();
        Assert.assertTrue(((Double) metric(new MetricName("all-rate", "stream-scope-state-metrics", "", this.tags)).metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldFlushInnerWhenFlushTimeRecords() {
        this.inner.flush();
        EasyMock.expectLastCall().once();
        init();
        this.metered.flush();
        Assert.assertTrue(((Double) metric("flush-rate").metricValue()).doubleValue() > 0.0d);
        EasyMock.verify(new Object[]{this.inner});
    }

    @Test
    public void shouldSetFlushListenerOnWrappedCachingStore() {
        CachedKeyValueStore cachedKeyValueStore = (CachedKeyValueStore) EasyMock.mock(CachedKeyValueStore.class);
        EasyMock.expect(Boolean.valueOf(cachedKeyValueStore.setFlushListener((CacheFlushListener) EasyMock.anyObject(CacheFlushListener.class), EasyMock.eq(false)))).andReturn(true);
        EasyMock.replay(new Object[]{cachedKeyValueStore});
        this.metered = new MeteredTimestampedKeyValueStore<>(cachedKeyValueStore, "scope", new MockTime(), Serdes.String(), new ValueAndTimestampSerde(Serdes.String()));
        Assert.assertTrue(this.metered.setFlushListener((CacheFlushListener) null, false));
        EasyMock.verify(new Object[]{cachedKeyValueStore});
    }

    @Test
    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
        Assert.assertFalse(this.metered.setFlushListener((CacheFlushListener) null, false));
    }

    private KafkaMetric metric(MetricName metricName) {
        return this.metrics.metric(metricName);
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
        EasyMock.expect(this.context.keySerde()).andStubReturn(Serdes.String());
        EasyMock.expect(this.context.valueSerde()).andStubReturn(Serdes.Long());
        MeteredTimestampedKeyValueStore meteredTimestampedKeyValueStore = new MeteredTimestampedKeyValueStore(this.inner, "scope", new MockTime(), (Serde) null, (Serde) null);
        EasyMock.replay(new Object[]{this.inner, this.context});
        meteredTimestampedKeyValueStore.init(this.context, this.inner);
        try {
            meteredTimestampedKeyValueStore.put("key", ValueAndTimestamp.make(42L, IntegrationTestUtils.DEFAULT_TIMEOUT));
        } catch (StreamsException e) {
            if (e.getCause() instanceof ClassCastException) {
                Assert.fail("Serdes are not correctly set from processor context.");
            }
            throw e;
        }
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
        EasyMock.expect(this.context.keySerde()).andStubReturn(Serdes.String());
        EasyMock.expect(this.context.valueSerde()).andStubReturn(Serdes.Long());
        MeteredTimestampedKeyValueStore meteredTimestampedKeyValueStore = new MeteredTimestampedKeyValueStore(this.inner, "scope", new MockTime(), Serdes.String(), new ValueAndTimestampSerde(Serdes.Long()));
        EasyMock.replay(new Object[]{this.inner, this.context});
        meteredTimestampedKeyValueStore.init(this.context, this.inner);
        try {
            meteredTimestampedKeyValueStore.put("key", ValueAndTimestamp.make(42L, IntegrationTestUtils.DEFAULT_TIMEOUT));
        } catch (StreamsException e) {
            if (e.getCause() instanceof ClassCastException) {
                Assert.fail("Serdes are not correctly set from constructor parameters.");
            }
            throw e;
        }
    }
}
