package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.class */
public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
    private static final String TOPIC = "topic";
    private static final String CACHE_NAMESPACE = "0_0-store-name";
    private final int maxCacheSizeBytes = 150;
    private InternalMockProcessorContext context;
    private CachingKeyValueStore store;
    private KeyValueStore<Bytes, byte[]> underlyingStore;
    private ThreadCache cache;
    private CacheFlushListenerStub<String, String> cacheFlushListener;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest$CacheFlushListenerStub.class */
    public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> {
        final Deserializer<K> keyDeserializer;
        final Deserializer<V> valueDesializer;
        final Map<K, Change<V>> forwarded = new HashMap();

        /* JADX INFO: Access modifiers changed from: package-private */
        public CacheFlushListenerStub(Deserializer<K> deserializer, Deserializer<V> deserializer2) {
            this.keyDeserializer = deserializer;
            this.valueDesializer = deserializer2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void apply(byte[] bArr, byte[] bArr2, byte[] bArr3, long j) {
            this.forwarded.put(this.keyDeserializer.deserialize((String) null, bArr), new Change(this.valueDesializer.deserialize((String) null, bArr2), this.valueDesializer.deserialize((String) null, bArr3)));
        }
    }

    @Before
    public void setUp() {
        this.underlyingStore = new InMemoryKeyValueStore("store");
        this.cacheFlushListener = new CacheFlushListenerStub<>(new StringDeserializer(), new StringDeserializer());
        this.store = new CachingKeyValueStore(this.underlyingStore);
        this.store.setFlushListener(this.cacheFlushListener, false);
        this.cache = new ThreadCache(new LogContext("testCache "), 150L, new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(null, null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, (Headers) null));
        this.store.init(this.context, (StateStore) null);
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest
    @After
    public void after() {
        super.after();
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest
    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext processorContext) {
        KeyValueStore<K, V> build = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("cache-store"), processorContext.keySerde(), processorContext.valueSerde()).withCachingEnabled().build();
        build.init(processorContext, build);
        return build;
    }

    @Test
    public void shouldSetFlushListener() {
        Assert.assertTrue(this.store.setFlushListener((CacheFlushListener) null, true));
        Assert.assertTrue(this.store.setFlushListener((CacheFlushListener) null, false));
    }

    @Test
    public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() {
        int addItemsToCache = addItemsToCache();
        Assert.assertEquals(addItemsToCache, this.underlyingStore.approximateNumEntries());
        Assert.assertEquals(addItemsToCache, this.cacheFlushListener.forwarded.size());
        this.store.put(bytesKey("key"), bytesValue("value"));
        Assert.assertEquals(addItemsToCache, this.underlyingStore.approximateNumEntries());
        Assert.assertEquals(addItemsToCache, this.cacheFlushListener.forwarded.size());
        this.store.put(bytesKey("key"), (byte[]) null);
        this.store.flush();
        Assert.assertEquals(addItemsToCache, this.underlyingStore.approximateNumEntries());
        Assert.assertEquals(addItemsToCache, this.cacheFlushListener.forwarded.size());
    }

    @Test
    public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush"));
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingKeyValueStore cachingKeyValueStore = this.store;
        cachingKeyValueStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingKeyValueStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingKeyValueStore cachingKeyValueStore = this.store;
        cachingKeyValueStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingKeyValueStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseCacheAfterErrorDuringStateStoreClose() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingKeyValueStore cachingKeyValueStore = this.store;
        cachingKeyValueStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingKeyValueStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    private void setUpCloseTests() {
        this.underlyingStore = (KeyValueStore) EasyMock.createNiceMock(KeyValueStore.class);
        EasyMock.expect(this.underlyingStore.name()).andStubReturn("store-name");
        EasyMock.expect(Boolean.valueOf(this.underlyingStore.isOpen())).andStubReturn(true);
        EasyMock.replay(new Object[]{this.underlyingStore});
        this.store = new CachingKeyValueStore(this.underlyingStore);
        this.cache = (ThreadCache) EasyMock.niceMock(ThreadCache.class);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, (Headers) null));
        this.store.init(this.context, this.store);
    }

    @Test
    public void shouldPutGetToFromCache() {
        this.store.put(bytesKey("key"), bytesValue("value"));
        this.store.put(bytesKey("key2"), bytesValue("value2"));
        MatcherAssert.assertThat(this.store.get(bytesKey("key")), CoreMatchers.equalTo(bytesValue("value")));
        MatcherAssert.assertThat(this.store.get(bytesKey("key2")), CoreMatchers.equalTo(bytesValue("value2")));
        Assert.assertEquals(2L, this.cache.size());
        Assert.assertEquals(0L, this.underlyingStore.approximateNumEntries());
    }

    private byte[] bytesValue(String str) {
        return str.getBytes();
    }

    private Bytes bytesKey(String str) {
        return Bytes.wrap(str.getBytes());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
        int addItemsToCache = addItemsToCache();
        Assert.assertEquals(addItemsToCache, this.underlyingStore.approximateNumEntries());
        Assert.assertEquals(addItemsToCache, this.store.approximateNumEntries());
        Assert.assertNotNull(this.underlyingStore.get(Bytes.wrap("0".getBytes())));
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() {
        Assert.assertEquals(addItemsToCache(), this.cacheFlushListener.forwarded.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() {
        this.store.put(bytesKey("1"), bytesValue("a"));
        this.store.flush();
        Assert.assertEquals("a", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() {
        this.store.setFlushListener(this.cacheFlushListener, true);
        this.store.put(bytesKey("1"), bytesValue("a"));
        this.store.flush();
        Assert.assertEquals("a", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").oldValue);
        this.store.put(bytesKey("1"), bytesValue("b"));
        this.store.put(bytesKey("1"), bytesValue("c"));
        this.store.flush();
        Assert.assertEquals("c", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertEquals("a", this.cacheFlushListener.forwarded.get("1").oldValue);
        this.store.put(bytesKey("1"), (byte[]) null);
        this.store.flush();
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertEquals("c", this.cacheFlushListener.forwarded.get("1").oldValue);
        this.cacheFlushListener.forwarded.clear();
        this.store.put(bytesKey("1"), bytesValue("a"));
        this.store.put(bytesKey("1"), bytesValue("b"));
        this.store.put(bytesKey("1"), (byte[]) null);
        this.store.flush();
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1"));
        this.cacheFlushListener.forwarded.clear();
    }

    @Test
    public void shouldNotForwardOldValuesWhenDisabled() {
        this.store.put(bytesKey("1"), bytesValue("a"));
        this.store.flush();
        Assert.assertEquals("a", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").oldValue);
        this.store.put(bytesKey("1"), bytesValue("b"));
        this.store.flush();
        Assert.assertEquals("b", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").oldValue);
        this.store.put(bytesKey("1"), (byte[]) null);
        this.store.flush();
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").oldValue);
        this.cacheFlushListener.forwarded.clear();
        this.store.put(bytesKey("1"), bytesValue("a"));
        this.store.put(bytesKey("1"), bytesValue("b"));
        this.store.put(bytesKey("1"), (byte[]) null);
        this.store.flush();
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1"));
        this.cacheFlushListener.forwarded.clear();
    }

    @Test
    public void shouldIterateAllStoredItems() {
        int addItemsToCache = addItemsToCache();
        KeyValueIterator all = this.store.all();
        ArrayList arrayList = new ArrayList();
        while (all.hasNext()) {
            arrayList.add(((KeyValue) all.next()).key);
        }
        Assert.assertEquals(addItemsToCache, arrayList.size());
    }

    @Test
    public void shouldIterateOverRange() {
        int addItemsToCache = addItemsToCache();
        KeyValueIterator range = this.store.range(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(addItemsToCache)));
        ArrayList arrayList = new ArrayList();
        while (range.hasNext()) {
            arrayList.add(((KeyValue) range.next()).key);
        }
        Assert.assertEquals(addItemsToCache, arrayList.size());
    }

    @Test
    public void shouldDeleteItemsFromCache() {
        this.store.put(bytesKey("a"), bytesValue("a"));
        this.store.delete(bytesKey("a"));
        Assert.assertNull(this.store.get(bytesKey("a")));
        Assert.assertFalse(this.store.range(bytesKey("a"), bytesKey("b")).hasNext());
        Assert.assertFalse(this.store.all().hasNext());
    }

    @Test
    public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() {
        this.store.put(bytesKey("a"), bytesValue("a"));
        this.store.flush();
        this.store.delete(bytesKey("a"));
        Assert.assertNull(this.store.get(bytesKey("a")));
        Assert.assertFalse(this.store.range(bytesKey("a"), bytesKey("b")).hasNext());
        Assert.assertFalse(this.store.all().hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.store.put(bytesKey("a"), bytesValue("a"));
        Assert.assertEquals(1L, this.cache.size());
        this.store.close();
        Assert.assertEquals(0L, this.cache.size());
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToGetFromClosedCachingStore() {
        this.store.close();
        this.store.get(bytesKey("a"));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
        this.store.close();
        this.store.put(bytesKey("a"), bytesValue("a"));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() {
        this.store.close();
        this.store.range(bytesKey("a"), bytesKey("b"));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoAllQueryOnClosedCachingStore() {
        this.store.close();
        this.store.all();
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoGetApproxSizeOnClosedCachingStore() {
        this.store.close();
        this.store.approximateNumEntries();
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoPutAllClosedCachingStore() {
        this.store.close();
        this.store.putAll(Collections.singletonList(KeyValue.pair(bytesKey("a"), bytesValue("a"))));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() {
        this.store.close();
        this.store.putIfAbsent(bytesKey("b"), bytesValue("c"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutWithNullKey() {
        this.store.put((Bytes) null, bytesValue("c"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() {
        this.store.putIfAbsent((Bytes) null, bytesValue("c"));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue((Object) null, bytesValue("a")));
        try {
            this.store.putAll(arrayList);
            Assert.fail("Should have thrown NullPointerException while putAll null key");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void shouldPutIfAbsent() {
        this.store.putIfAbsent(bytesKey("b"), bytesValue("2"));
        MatcherAssert.assertThat(this.store.get(bytesKey("b")), CoreMatchers.equalTo(bytesValue("2")));
        this.store.putIfAbsent(bytesKey("b"), bytesValue("3"));
        MatcherAssert.assertThat(this.store.get(bytesKey("b")), CoreMatchers.equalTo(bytesValue("2")));
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest
    @Test
    public void shouldPutAll() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(bytesKey("a"), bytesValue("1")));
        arrayList.add(new KeyValue(bytesKey("b"), bytesValue("2")));
        this.store.putAll(arrayList);
        MatcherAssert.assertThat(this.store.get(bytesKey("a")), CoreMatchers.equalTo(bytesValue("1")));
        MatcherAssert.assertThat(this.store.get(bytesKey("b")), CoreMatchers.equalTo(bytesValue("2")));
    }

    @Test
    public void shouldReturnUnderlying() {
        Assert.assertEquals(this.underlyingStore, this.store.wrapped());
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToDeleteFromClosedCachingStore() {
        this.store.close();
        this.store.delete(bytesKey("key"));
    }

    private int addItemsToCache() {
        int i = 0;
        int i2 = 0;
        while (i < 150) {
            int i3 = i2;
            i2++;
            String valueOf = String.valueOf(i3);
            this.store.put(bytesKey(valueOf), bytesValue(valueOf));
            i += ThreadCacheTest.memoryCacheEntrySize(valueOf.getBytes(), valueOf.getBytes(), TOPIC);
        }
        return i2;
    }
}
