package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachingKeyValueStoreTest;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingWindowStoreTest.class */
public class CachingWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 150;
    private RocksDBWindowStore<Bytes, byte[]> underlying;
    private CachingWindowStore<String, String> cachingStore;
    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
    private ThreadCache cache;
    private String topic;
    private static final Long WINDOW_SIZE = 10000L;
    private static final Long DEFAULT_TIMESTAMP = 10L;

    @Before
    public void setUp() throws Exception {
        this.underlying = new RocksDBWindowStore<>("test", IntegrationTestUtils.DEFAULT_TIMEOUT, 3, false, Serdes.Bytes(), Serdes.ByteArray());
        this.cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
        this.cachingStore = new CachingWindowStore<>(this.underlying, Serdes.String(), Serdes.String(), WINDOW_SIZE.longValue());
        this.cachingStore.setFlushListener(this.cacheListener);
        this.cache = new ThreadCache(150L);
        this.topic = "topic";
        MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, TestUtils.tempDirectory(), (Serde<?>) null, (Serde<?>) null, (RecordCollector) null, this.cache);
        mockProcessorContext.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP.longValue(), 0L, 0, this.topic));
        this.cachingStore.init(mockProcessorContext, this.cachingStore);
    }

    @Test
    public void shouldPutFetchFromCache() throws Exception {
        this.cachingStore.put("a", "a");
        this.cachingStore.put("b", "b");
        WindowStoreIterator fetch = this.cachingStore.fetch("a", 10L, 10L);
        WindowStoreIterator fetch2 = this.cachingStore.fetch("b", 10L, 10L);
        Assert.assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());
        Assert.assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "b"), fetch2.next());
        Assert.assertFalse(fetch.hasNext());
        Assert.assertFalse(fetch2.hasNext());
        Assert.assertEquals(2L, this.cache.size());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
        int addItemsToCache = addItemsToCache();
        WindowStoreIterator fetch = this.underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue());
        KeyValue keyValue = (KeyValue) fetch.next();
        Assert.assertEquals(DEFAULT_TIMESTAMP, keyValue.key);
        Assert.assertArrayEquals("0".getBytes(), (byte[]) keyValue.value);
        Assert.assertFalse(fetch.hasNext());
        Assert.assertEquals(addItemsToCache - 1, this.cache.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue()));
        this.cachingStore.put("1", "a");
        this.cachingStore.flush();
        Assert.assertEquals("a", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() throws Exception {
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue()));
        this.cachingStore.put("1", "a");
        this.cachingStore.flush();
        this.cachingStore.put("1", "b");
        this.cachingStore.flush();
        Assert.assertEquals("b", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertEquals("a", this.cacheListener.forwarded.get(windowed).oldValue);
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception {
        Assert.assertEquals(addItemsToCache(), this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() throws Exception {
        this.cachingStore.put("1", "a", DEFAULT_TIMESTAMP.longValue());
        this.cachingStore.flush();
        this.cachingStore.put("1", "b", DEFAULT_TIMESTAMP.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch("1", DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue());
        Assert.assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "b"), fetch.next());
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldIterateAcrossWindows() throws Exception {
        this.cachingStore.put("1", "a", DEFAULT_TIMESTAMP.longValue());
        this.cachingStore.put("1", "b", DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch("1", DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue());
        Assert.assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());
        Assert.assertEquals(KeyValue.pair(Long.valueOf(DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue()), "b"), fetch.next());
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStore() throws Exception {
        this.underlying.put(Bytes.wrap("1".getBytes()), "a".getBytes());
        this.cachingStore.put("1", "b", DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch("1", DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue());
        Assert.assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());
        Assert.assertEquals(KeyValue.pair(Long.valueOf(DEFAULT_TIMESTAMP.longValue() + WINDOW_SIZE.longValue()), "b"), fetch.next());
        Assert.assertFalse(fetch.hasNext());
    }

    private int addItemsToCache() throws IOException {
        int i = 0;
        int i2 = 0;
        while (i < MAX_CACHE_SIZE_BYTES) {
            int i3 = i2;
            i2++;
            String valueOf = String.valueOf(i3);
            this.cachingStore.put(valueOf, valueOf);
            i += ThreadCacheTest.memoryCacheEntrySize(valueOf.getBytes(), valueOf.getBytes(), this.topic) + 8 + 4;
        }
        return i2;
    }
}
