package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachingKeyValueStoreTest;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingWindowStoreTest.class */
public class CachingWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 150;
    private static final long DEFAULT_TIMESTAMP = 10;
    private static final Long WINDOW_SIZE = 10000L;
    private MockProcessorContext context;
    private RocksDBSegmentedBytesStore underlying;
    private CachingWindowStore<String, String> cachingStore;
    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
    private ThreadCache cache;
    private String topic;
    private WindowKeySchema keySchema;

    @Before
    public void setUp() {
        this.keySchema = new WindowKeySchema();
        this.underlying = new RocksDBSegmentedBytesStore("test", IntegrationTestUtils.DEFAULT_TIMEOUT, 3, this.keySchema);
        RocksDBWindowStore rocksDBWindowStore = new RocksDBWindowStore(this.underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE.longValue());
        this.cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
        this.cachingStore = new CachingWindowStore<>(rocksDBWindowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE.longValue(), Segments.segmentInterval(IntegrationTestUtils.DEFAULT_TIMEOUT, 3));
        this.cachingStore.setFlushListener(this.cacheListener, false);
        this.cache = new ThreadCache(new LogContext("testCache "), 150L, new MockStreamsMetrics(new Metrics()));
        this.topic = "topic";
        this.context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0L, 0, this.topic));
        this.cachingStore.init(this.context, this.cachingStore);
    }

    @After
    public void closeStore() {
        this.context.close();
        this.cachingStore.close();
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"));
        this.cachingStore.put(bytesKey("b"), bytesValue("b"));
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("a"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
        WindowStoreIterator fetch2 = this.cachingStore.fetch(bytesKey("b"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
        verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "a");
        verifyKeyValue((KeyValue) fetch2.next(), DEFAULT_TIMESTAMP, "b");
        Assert.assertFalse(fetch.hasNext());
        Assert.assertFalse(fetch2.hasNext());
        Assert.assertEquals(2L, this.cache.size());
    }

    private void verifyKeyValue(KeyValue<Long, byte[]> keyValue, long j, String str) {
        MatcherAssert.assertThat(keyValue.key, CoreMatchers.equalTo(Long.valueOf(j)));
        MatcherAssert.assertThat(keyValue.value, CoreMatchers.equalTo(bytesValue(str)));
    }

    private static byte[] bytesValue(String str) {
        return str.getBytes();
    }

    private static Bytes bytesKey(String str) {
        return Bytes.wrap(str.getBytes());
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"));
        this.cachingStore.put(bytesKey("b"), bytesValue("b"));
        KeyValueIterator fetch = this.cachingStore.fetch(bytesKey("a"), bytesKey("b"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetch.next(), new Windowed(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), "a");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetch.next(), new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), "b");
        Assert.assertFalse(fetch.hasNext());
        Assert.assertEquals(2L, this.cache.size());
    }

    @Test
    public void shouldGetAllFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"));
        this.cachingStore.put(bytesKey("b"), bytesValue("b"));
        this.cachingStore.put(bytesKey("c"), bytesValue("c"));
        this.cachingStore.put(bytesKey("d"), bytesValue("d"));
        this.cachingStore.put(bytesKey("e"), bytesValue("e"));
        this.cachingStore.put(bytesKey("f"), bytesValue("f"));
        this.cachingStore.put(bytesKey("g"), bytesValue("g"));
        this.cachingStore.put(bytesKey("h"), bytesValue("h"));
        KeyValueIterator all = this.cachingStore.all();
        for (String str : new String[]{"a", "b", "c", "d", "e", "f", "g", "h"}) {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue) all.next(), new Windowed(bytesKey(str), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), str);
        }
        Assert.assertFalse(all.hasNext());
    }

    @Test
    public void shouldFetchAllWithinTimestampRange() {
        String[] strArr = {"a", "b", "c", "d", "e", "f", "g", "h"};
        for (int i = 0; i < strArr.length; i++) {
            this.context.setTime(i);
            this.cachingStore.put(bytesKey(strArr[i]), bytesValue(strArr[i]));
        }
        KeyValueIterator fetchAll = this.cachingStore.fetchAll(0L, 7L);
        for (int i2 = 0; i2 < strArr.length; i2++) {
            String str = strArr[i2];
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetchAll.next(), new Windowed(bytesKey(str), new TimeWindow(i2, i2 + WINDOW_SIZE.longValue())), str);
        }
        Assert.assertFalse(fetchAll.hasNext());
        KeyValueIterator fetchAll2 = this.cachingStore.fetchAll(2L, 4L);
        for (int i3 = 2; i3 <= 4; i3++) {
            String str2 = strArr[i3];
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetchAll2.next(), new Windowed(bytesKey(str2), new TimeWindow(i3, i3 + WINDOW_SIZE.longValue())), str2);
        }
        Assert.assertFalse(fetchAll2.hasNext());
        KeyValueIterator fetchAll3 = this.cachingStore.fetchAll(5L, 7L);
        for (int i4 = 5; i4 <= 7; i4++) {
            String str3 = strArr[i4];
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetchAll3.next(), new Windowed(bytesKey(str3), new TimeWindow(i4, i4 + WINDOW_SIZE.longValue())), str3);
        }
        Assert.assertFalse(fetchAll3.hasNext());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException {
        int addItemsToCache = addItemsToCache();
        KeyValueIterator fetch = this.underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
        KeyValue keyValue = (KeyValue) fetch.next();
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.keySchema.segmentTimestamp((Bytes) keyValue.key));
        Assert.assertArrayEquals("0".getBytes(), (byte[]) keyValue.value);
        Assert.assertFalse(fetch.hasNext());
        Assert.assertEquals(addItemsToCache - 1, this.cache.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() {
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        this.cachingStore.put(bytesKey("1"), bytesValue("a"));
        this.cachingStore.flush();
        Assert.assertEquals("a", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() {
        this.cachingStore.setFlushListener(this.cacheListener, true);
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        this.cachingStore.put(bytesKey("1"), bytesValue("a"));
        this.cachingStore.flush();
        this.cachingStore.put(bytesKey("1"), bytesValue("b"));
        this.cachingStore.flush();
        Assert.assertEquals("b", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertEquals("a", this.cacheListener.forwarded.get(windowed).oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenDisabled() {
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        this.cachingStore.put(bytesKey("1"), bytesValue("a"));
        this.cachingStore.flush();
        this.cachingStore.put(bytesKey("1"), bytesValue("b"));
        this.cachingStore.flush();
        Assert.assertEquals("b", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException {
        Assert.assertEquals(addItemsToCache(), this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() {
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
        verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "b");
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldIterateAcrossWindows() {
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "a");
        verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), "b");
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStore() {
        Bytes wrap = Bytes.wrap("1".getBytes());
        this.underlying.put(WindowStoreUtils.toBinaryKey(wrap, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
        this.cachingStore.put(wrap, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "a");
        verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), "b");
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStoreKeyRange() {
        Bytes wrap = Bytes.wrap("1".getBytes());
        this.underlying.put(WindowStoreUtils.toBinaryKey(wrap, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
        this.cachingStore.put(wrap, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        KeyValueIterator fetch = this.cachingStore.fetch(wrap, bytesKey("2"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetch.next(), new Windowed(wrap, new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), "a");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetch.next(), new Windowed(wrap, new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue() + WINDOW_SIZE.longValue())), "b");
        Assert.assertFalse(fetch.hasNext());
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"));
        Assert.assertEquals(1L, this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals(0L, this.cache.size());
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.fetch(bytesKey("a"), 0L, DEFAULT_TIMESTAMP);
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.fetch(bytesKey("a"), bytesKey("b"), 0L, DEFAULT_TIMESTAMP);
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.put(bytesKey("a"), bytesValue("a"));
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0003"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000L);
        StreamsTestUtils.verifyKeyValueList(Utils.mkList(new KeyValue[]{KeyValue.pair(0L, bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(60000L, bytesValue("0005"))}), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("a"), 0L, Long.MAX_VALUE)));
    }

    @Test
    public void shouldFetchAndIterateOverKeyRange() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0003"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000L);
        StreamsTestUtils.verifyKeyValueList(Utils.mkList(new KeyValue[]{windowedPair("a", "0001", 0L), windowedPair("a", "0003", 1L), windowedPair("a", "0005", 60000L)}), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0L, Long.MAX_VALUE)));
        StreamsTestUtils.verifyKeyValueList(Utils.mkList(new KeyValue[]{windowedPair("aa", "0002", 0L), windowedPair("aa", "0004", 1L)}), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, Long.MAX_VALUE)));
        StreamsTestUtils.verifyKeyValueList(Utils.mkList(new KeyValue[]{windowedPair("a", "0001", 0L), windowedPair("a", "0003", 1L), windowedPair("aa", "0002", 0L), windowedPair("aa", "0004", 1L), windowedPair("a", "0005", 60000L)}), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0L, Long.MAX_VALUE)));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        this.cachingStore.put((Bytes) null, bytesValue("anyValue"));
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
        this.cachingStore.put(bytesKey("a"), (byte[]) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        this.cachingStore.fetch((Bytes) null, 1L, 2L);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
        this.cachingStore.fetch((Bytes) null, bytesKey("anyTo"), 1L, 2L);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
        this.cachingStore.fetch(bytesKey("anyFrom"), (Bytes) null, 1L, 2L);
    }

    private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(String str, String str2, long j) {
        return KeyValue.pair(new Windowed(bytesKey(str), new TimeWindow(j, j + WINDOW_SIZE.longValue())), bytesValue(str2));
    }

    private int addItemsToCache() throws IOException {
        int i = 0;
        int i2 = 0;
        while (i < MAX_CACHE_SIZE_BYTES) {
            int i3 = i2;
            i2++;
            String valueOf = String.valueOf(i3);
            this.cachingStore.put(bytesKey(valueOf), bytesValue(valueOf));
            i += ThreadCacheTest.memoryCacheEntrySize(valueOf.getBytes(), valueOf.getBytes(), this.topic) + 8 + 4;
        }
        return i2;
    }
}
