package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.class */
public class CachingKeyValueStoreTest {
    private CachingKeyValueStore<String, String> store;
    private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
    private ThreadCache cache;
    private int maxCacheSizeBytes;
    private CacheFlushListenerStub<String> cacheFlushListener;
    private String topic;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest$CacheFlushListenerStub.class */
    public static class CacheFlushListenerStub<K> implements CacheFlushListener<K, String> {
        public final Map<K, Change<String>> forwarded = new HashMap();

        public void apply(K k, String str, String str2) {
            this.forwarded.put(k, new Change<>(str, str2));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2, Object obj3) {
            apply((CacheFlushListenerStub<K>) obj, (String) obj2, (String) obj3);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.underlyingStore = new InMemoryKeyValueStore<>("store");
        this.cacheFlushListener = new CacheFlushListenerStub<>();
        this.store = new CachingKeyValueStore<>(this.underlyingStore, Serdes.String(), Serdes.String());
        this.store.setFlushListener(this.cacheFlushListener);
        this.maxCacheSizeBytes = 150;
        this.cache = new ThreadCache(this.maxCacheSizeBytes);
        MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, (File) null, (Serde<?>) null, (Serde<?>) null, (RecordCollector) null, this.cache);
        this.topic = "topic";
        mockProcessorContext.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, this.topic));
        this.store.init(mockProcessorContext, (StateStore) null);
    }

    @Test
    public void shouldPutGetToFromCache() throws Exception {
        this.store.put("key", "value");
        this.store.put("key2", "value2");
        Assert.assertEquals("value", this.store.get("key"));
        Assert.assertEquals("value2", this.store.get("key2"));
        Assert.assertEquals(2L, this.cache.size());
        Assert.assertEquals(0L, this.underlyingStore.approximateNumEntries());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
        int addItemsToCache = addItemsToCache();
        Assert.assertEquals(addItemsToCache, this.underlyingStore.approximateNumEntries());
        Assert.assertEquals(addItemsToCache, this.store.approximateNumEntries());
        Assert.assertNotNull(this.underlyingStore.get(Bytes.wrap("0".getBytes())));
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception {
        Assert.assertEquals(addItemsToCache(), this.cacheFlushListener.forwarded.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
        this.store.put("1", "a");
        this.store.flush();
        Assert.assertEquals("a", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertNull(this.cacheFlushListener.forwarded.get("1").oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() throws Exception {
        this.store.put("1", "a");
        this.store.flush();
        this.store.put("1", "b");
        this.store.flush();
        Assert.assertEquals("b", this.cacheFlushListener.forwarded.get("1").newValue);
        Assert.assertEquals("a", this.cacheFlushListener.forwarded.get("1").oldValue);
    }

    @Test
    public void shouldIterateAllStoredItems() throws Exception {
        int addItemsToCache = addItemsToCache();
        KeyValueIterator all = this.store.all();
        ArrayList arrayList = new ArrayList();
        while (all.hasNext()) {
            arrayList.add(((KeyValue) all.next()).key);
        }
        Assert.assertEquals(addItemsToCache, arrayList.size());
    }

    @Test
    public void shouldIterateOverRange() throws Exception {
        int addItemsToCache = addItemsToCache();
        KeyValueIterator range = this.store.range(String.valueOf(0), String.valueOf(addItemsToCache));
        ArrayList arrayList = new ArrayList();
        while (range.hasNext()) {
            arrayList.add(((KeyValue) range.next()).key);
        }
        Assert.assertEquals(addItemsToCache, arrayList.size());
    }

    @Test
    public void shouldDeleteItemsFromCache() throws Exception {
        this.store.put("a", "a");
        this.store.delete("a");
        Assert.assertNull(this.store.get("a"));
        Assert.assertFalse(this.store.range("a", "b").hasNext());
        Assert.assertFalse(this.store.all().hasNext());
    }

    @Test
    public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() throws Exception {
        this.store.put("a", "a");
        this.store.flush();
        this.store.delete("a");
        Assert.assertNull(this.store.get("a"));
        Assert.assertFalse(this.store.range("a", "b").hasNext());
        Assert.assertFalse(this.store.all().hasNext());
    }

    private int addItemsToCache() throws IOException {
        int i = 0;
        int i2 = 0;
        while (i < this.maxCacheSizeBytes) {
            int i3 = i2;
            i2++;
            String valueOf = String.valueOf(i3);
            this.store.put(valueOf, valueOf);
            i += ThreadCacheTest.memoryCacheEntrySize(valueOf.getBytes(), valueOf.getBytes(), this.topic);
        }
        return i2;
    }
}
