/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachingKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheTest;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 150;
    private static final Long WINDOW_SIZE = 10000L;
    private RocksDBWindowStore<Bytes, byte[]> underlying;
    private CachingWindowStore<String, String> cachingStore;
    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
    private ThreadCache cache;
    private String topic;
    private static final Long DEFAULT_TIMESTAMP = 10L;

    @Before
    public void setUp() throws Exception {
        this.underlying = new RocksDBWindowStore("test", 30000L, 3, false, Serdes.Bytes(), Serdes.ByteArray());
        this.cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub();
        this.cachingStore = new CachingWindowStore(this.underlying, Serdes.String(), Serdes.String(), WINDOW_SIZE.longValue());
        this.cachingStore.setFlushListener(this.cacheListener);
        this.cache = new ThreadCache(150L);
        this.topic = "topic";
        MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector)null, this.cache);
        context.setRecordContext((RecordContext)new ProcessorRecordContext(DEFAULT_TIMESTAMP.longValue(), 0L, 0, this.topic));
        this.cachingStore.init((ProcessorContext)context, this.cachingStore);
    }

    @Test
    public void shouldPutFetchFromCache() throws Exception {
        this.cachingStore.put((Object)"a", (Object)"a");
        this.cachingStore.put((Object)"b", (Object)"b");
        WindowStoreIterator a = this.cachingStore.fetch((Object)"a", 10L, 10L);
        WindowStoreIterator b = this.cachingStore.fetch((Object)"b", 10L, 10L);
        Assert.assertEquals((Object)KeyValue.pair((Object)DEFAULT_TIMESTAMP, (Object)"a"), (Object)a.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)DEFAULT_TIMESTAMP, (Object)"b"), (Object)b.next());
        Assert.assertFalse((boolean)a.hasNext());
        Assert.assertFalse((boolean)b.hasNext());
        Assert.assertEquals((long)2L, (long)this.cache.size());
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
        int added = this.addItemsToCache();
        WindowStoreIterator iter = this.underlying.fetch((Object)Bytes.wrap((byte[])"0".getBytes()), DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue());
        KeyValue next = (KeyValue)iter.next();
        Assert.assertEquals((Object)DEFAULT_TIMESTAMP, (Object)next.key);
        Assert.assertArrayEquals((byte[])"0".getBytes(), (byte[])((byte[])next.value));
        Assert.assertFalse((boolean)iter.hasNext());
        Assert.assertEquals((long)(added - 1), (long)this.cache.size());
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE));
        this.cachingStore.put((Object)"1", (Object)"a");
        this.cachingStore.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() throws Exception {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE));
        this.cachingStore.put((Object)"1", (Object)"a");
        this.cachingStore.flush();
        this.cachingStore.put((Object)"1", (Object)"b");
        this.cachingStore.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertEquals((Object)"a", (Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() throws Exception {
        int numRecords = this.addItemsToCache();
        Assert.assertEquals((long)numRecords, (long)this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() throws Exception {
        this.cachingStore.put((Object)"1", (Object)"a", DEFAULT_TIMESTAMP.longValue());
        this.cachingStore.flush();
        this.cachingStore.put((Object)"1", (Object)"b", DEFAULT_TIMESTAMP.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)"1", DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP.longValue());
        Assert.assertEquals((Object)KeyValue.pair((Object)DEFAULT_TIMESTAMP, (Object)"b"), (Object)fetch.next());
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateAcrossWindows() throws Exception {
        this.cachingStore.put((Object)"1", (Object)"a", DEFAULT_TIMESTAMP.longValue());
        this.cachingStore.put((Object)"1", (Object)"b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)"1", DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE);
        Assert.assertEquals((Object)KeyValue.pair((Object)DEFAULT_TIMESTAMP, (Object)"a"), (Object)fetch.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)(DEFAULT_TIMESTAMP + WINDOW_SIZE), (Object)"b"), (Object)fetch.next());
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    @Test
    public void shouldIterateCacheAndStore() throws Exception {
        this.underlying.put((Object)Bytes.wrap((byte[])"1".getBytes()), (Object)"a".getBytes());
        this.cachingStore.put((Object)"1", (Object)"b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
        WindowStoreIterator fetch = this.cachingStore.fetch((Object)"1", DEFAULT_TIMESTAMP.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE);
        Assert.assertEquals((Object)KeyValue.pair((Object)DEFAULT_TIMESTAMP, (Object)"a"), (Object)fetch.next());
        Assert.assertEquals((Object)KeyValue.pair((Object)(DEFAULT_TIMESTAMP + WINDOW_SIZE), (Object)"b"), (Object)fetch.next());
        Assert.assertFalse((boolean)fetch.hasNext());
    }

    private int addItemsToCache() throws IOException {
        String kv;
        int i = 0;
        for (int cachedSize = 0; cachedSize < 150; cachedSize += ThreadCacheTest.memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), this.topic) + 8 + 4) {
            kv = String.valueOf(i++);
            this.cachingStore.put((Object)kv, (Object)kv);
        }
        return i;
    }
}

