/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CacheFlushListenerStub;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedWindowSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBTimestampedWindowStore;
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SegmentedCacheFunction;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheTest;
import org.apache.kafka.streams.state.internals.TimeOrderedCachingWindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TimeOrderedCachingPersistentWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 300;
    private static final long DEFAULT_TIMESTAMP = 10L;
    private static final Long WINDOW_SIZE = 10L;
    private static final long SEGMENT_INTERVAL = 100L;
    private static final String TOPIC = "topic";
    private static final String CACHE_NAMESPACE = "0_0-store-name";
    private ThreadCache cache;
    private InternalMockProcessorContext context;
    private PrefixedWindowKeySchemas.TimeFirstWindowKeySchema baseKeySchema;
    private WindowStore<Bytes, byte[]> underlyingStore;
    private TimeOrderedCachingWindowStore cachingStore;
    private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore;
    private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
    @Parameterized.Parameter
    public boolean hasIndex;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({true}, {false});
    }

    @Before
    public void setUp() {
        this.baseKeySchema = new PrefixedWindowKeySchemas.TimeFirstWindowKeySchema();
        this.bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test", "metrics-scope", 100L, 100L, this.hasIndex);
        this.underlyingStore = new RocksDBTimeOrderedWindowStore(this.bytesStore, false, WINDOW_SIZE.longValue());
        TimeWindowedDeserializer keyDeserializer = new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), WINDOW_SIZE);
        keyDeserializer.setIsChangelogTopic(true);
        this.cacheListener = new CacheFlushListenerStub(keyDeserializer, new StringDeserializer());
        this.cachingStore = new TimeOrderedCachingWindowStore(this.underlyingStore, WINDOW_SIZE.longValue(), 100L);
        this.cachingStore.setFlushListener(this.cacheListener, false);
        this.cache = new ThreadCache(new LogContext("testCache "), 300L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, (Headers)new RecordHeaders()));
        this.cachingStore.init((StateStoreContext)this.context, (StateStore)this.cachingStore);
    }

    @After
    public void closeStore() {
        this.cachingStore.close();
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        RocksDBTimeOrderedWindowStore inner = (RocksDBTimeOrderedWindowStore)EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
        EasyMock.expect((Object)inner.hasIndex()).andReturn((Object)this.hasIndex);
        EasyMock.replay((Object[])new Object[]{inner});
        TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore((WindowStore)inner, WINDOW_SIZE.longValue(), 100L);
        EasyMock.reset((Object[])new Object[]{inner});
        EasyMock.expect((Object)inner.name()).andStubReturn((Object)"store");
        inner.init((org.apache.kafka.streams.processor.ProcessorContext)this.context, (StateStore)outer);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{inner});
        outer.init((org.apache.kafka.streams.processor.ProcessorContext)this.context, (StateStore)outer);
        EasyMock.verify((Object[])new Object[]{inner});
    }

    @Test
    public void shouldDelegateInit() {
        RocksDBTimeOrderedWindowStore inner = (RocksDBTimeOrderedWindowStore)EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
        EasyMock.expect((Object)inner.hasIndex()).andReturn((Object)this.hasIndex);
        EasyMock.replay((Object[])new Object[]{inner});
        TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore((WindowStore)inner, WINDOW_SIZE.longValue(), 100L);
        EasyMock.reset((Object[])new Object[]{inner});
        EasyMock.expect((Object)inner.name()).andStubReturn((Object)"store");
        inner.init((StateStoreContext)this.context, (StateStore)outer);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{inner});
        outer.init((StateStoreContext)this.context, (StateStore)outer);
        EasyMock.verify((Object[])new Object[]{inner});
    }

    @Test
    public void shouldThrowIfWrongStore() {
        RocksDBTimestampedWindowStore innerWrong = (RocksDBTimestampedWindowStore)EasyMock.mock(RocksDBTimestampedWindowStore.class);
        Exception e = (Exception)Assert.assertThrows(IllegalArgumentException.class, () -> new TimeOrderedCachingWindowStore((WindowStore)innerWrong, WINDOW_SIZE.longValue(), 100L));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"TimeOrderedCachingWindowStore only supports RocksDBTimeOrderedWindowStore backed store"));
        RocksDBTimeOrderedWindowStore inner = (RocksDBTimeOrderedWindowStore)EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
        new TimeOrderedCachingWindowStore((WindowStore)inner, WINDOW_SIZE.longValue(), 100L);
    }

    @Test
    public void shouldNotReturnDuplicatesInRanges() {
        int i;
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder storeBuilder = Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create((String)"store-name", (Duration)Duration.ofHours(1L), (Duration)Duration.ofMinutes(1L), (boolean)false, (boolean)this.hasIndex), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled();
        builder.addStateStore(storeBuilder);
        builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).process(() -> new Processor<String, String, String, String>(){
            private int numRecordsProcessed;
            private WindowStore store;

            public void init(ProcessorContext<String, String> processorContext) {
                this.store = (WindowStore)processorContext.getStateStore("store-name");
                int count = 0;
                try (KeyValueIterator all = this.store.all();){
                    while (all.hasNext()) {
                        ++count;
                        all.next();
                    }
                }
                MatcherAssert.assertThat((Object)count, (Matcher)CoreMatchers.equalTo((Object)0));
            }

            public void process(Record<String, String> record) {
                int count = 0;
                try (KeyValueIterator all = this.store.all();){
                    while (all.hasNext()) {
                        ++count;
                        all.next();
                    }
                }
                MatcherAssert.assertThat((Object)count, (Matcher)CoreMatchers.equalTo((Object)this.numRecordsProcessed));
                this.store.put(record.value(), (Object)ValueAndTimestamp.make((Object)record.value(), (long)record.timestamp()), record.timestamp());
                ++this.numRecordsProcessed;
            }
        }, new String[]{"store-name"});
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("default.key.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("commit.interval.ms", (Object)10000L);
        Instant initialWallClockTime = Instant.ofEpochMilli(0L);
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
        TestInputTopic inputTopic = driver.createInputTopic(TOPIC, Serdes.String().serializer(), Serdes.String().serializer(), initialWallClockTime, Duration.ZERO);
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.advanceWallClockTime(Duration.ofSeconds(10L));
        inputTopic.advanceTime(Duration.ofSeconds(10L));
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.advanceWallClockTime(Duration.ofSeconds(10L));
        inputTopic.advanceTime(Duration.ofSeconds(10L));
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.advanceWallClockTime(Duration.ofSeconds(10L));
        inputTopic.advanceTime(Duration.ofSeconds(10L));
        for (i = 0; i < 5; ++i) {
            inputTopic.pipeInput((Object)UUID.randomUUID().toString(), (Object)UUID.randomUUID().toString());
        }
        driver.close();
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), 10L), (Matcher)CoreMatchers.equalTo((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a")));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), 10L), (Matcher)CoreMatchers.equalTo((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b")));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), 10L), (Matcher)CoreMatchers.equalTo(null));
        MatcherAssert.assertThat((Object)this.cachingStore.fetch(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), 0L), (Matcher)CoreMatchers.equalTo(null));
        try (WindowStoreIterator a = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));
             WindowStoreIterator b = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));){
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)a.next()), 10L, "a");
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)b.next()), 10L, "b");
            Assert.assertFalse((boolean)a.hasNext());
            Assert.assertFalse((boolean)b.hasNext());
            int expectedSize = this.hasIndex ? 4 : 2;
            Assert.assertEquals((long)expectedSize, (long)this.cache.size());
        }
    }

    @Test
    public void shouldMatchPositionAfterPutWithFlushListener() {
        this.cachingStore.setFlushListener(record -> {}, false);
        this.shouldMatchPositionAfterPut();
    }

    @Test
    public void shouldMatchPositionAfterPutWithoutFlushListener() {
        this.cachingStore.setFlushListener(null, false);
        this.shouldMatchPositionAfterPut();
    }

    private void shouldMatchPositionAfterPut() {
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", (Headers)new RecordHeaders()));
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("key1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("value1"), 10L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", (Headers)new RecordHeaders()));
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("key2"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("value2"), 10L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", (Headers)new RecordHeaders()));
        Assert.assertEquals((Object)Position.emptyPosition(), (Object)this.cachingStore.getPosition());
        Assert.assertEquals((Object)Position.emptyPosition(), (Object)this.underlyingStore.getPosition());
        this.cachingStore.flush();
        Assert.assertEquals((Object)Position.fromMap((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"", (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)0, (Object)2L)}))})), (Object)this.cachingStore.getPosition());
        Assert.assertEquals((Object)Position.fromMap((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"", (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)0, (Object)2L)}))})), (Object)this.underlyingStore.getPosition());
    }

    private void verifyKeyValue(KeyValue<Long, byte[]> next, long expectedKey, String expectedValue) {
        MatcherAssert.assertThat((Object)next.key, (Matcher)CoreMatchers.equalTo((Object)expectedKey));
        MatcherAssert.assertThat((Object)next.value, (Matcher)CoreMatchers.equalTo((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue(expectedValue)));
    }

    private static byte[] bytesValue(String value) {
        return value.getBytes();
    }

    private static Bytes bytesKey(String key) {
        return Bytes.wrap((byte[])key.getBytes());
    }

    private String stringFrom(byte[] from) {
        return (String)Serdes.String().deserializer().deserialize("", from);
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        try (KeyValueIterator iterator = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("a", "b");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
            int expectedSize = this.hasIndex ? 4 : 2;
            Assert.assertEquals((long)expectedSize, (long)this.cache.size());
        }
    }

    @Test
    public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 20L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 30L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 30L);
        try (KeyValueIterator iterator = this.cachingStore.fetch(null, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(30L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), (Window)new TimeWindow(20L, 20L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("a", "b", "c", "d");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
        }
    }

    @Test
    public void shouldPutFetchRangeFromCacheForNullKeyTo() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 20L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 30L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 30L);
        try (KeyValueIterator iterator = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), null, Instant.ofEpochMilli(10L), Instant.ofEpochMilli(30L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), (Window)new TimeWindow(20L, 20L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("b", "c", "d", "e");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
        }
    }

    @Test
    public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 20L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 30L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 30L);
        try (KeyValueIterator iterator = this.cachingStore.fetch(null, null, Instant.ofEpochMilli(10L), Instant.ofEpochMilli(30L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), (Window)new TimeWindow(20L, 20L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("a", "b", "c", "d", "e");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 20L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 30L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 30L);
        try (KeyValueIterator iterator = this.cachingStore.backwardFetch(null, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(30L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), (Window)new TimeWindow(20L, 20L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("c", "b", "a");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 20L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 30L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 30L);
        try (KeyValueIterator iterator = this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), null, Instant.ofEpochMilli(10L), Instant.ofEpochMilli(30L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), (Window)new TimeWindow(20L, 20L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("e", "d", "c");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 20L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 30L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 30L);
        try (KeyValueIterator iterator = this.cachingStore.backwardFetch(null, null, Instant.ofEpochMilli(10L), Instant.ofEpochMilli(30L));){
            List<Windowed<Bytes>> expectedKeys = Arrays.asList(new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), (Window)new TimeWindow(30L, 30L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), (Window)new TimeWindow(20L, 20L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)));
            List<String> expectedValues = Arrays.asList("e", "d", "c", "b", "a");
            StreamsTestUtils.verifyAllWindowedKeyValues((KeyValueIterator<Windowed<Bytes>, byte[]>)iterator, expectedKeys, expectedValues);
        }
    }

    @Test
    public void shouldGetAllFromCache() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("f"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("f"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("g"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("g"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("h"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("h"), 10L);
        try (KeyValueIterator iterator = this.cachingStore.all();){
            String[] array;
            for (String s : array = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"}) {
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(s), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), s);
            }
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void shouldGetAllBackwardFromCache() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("c"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("d"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("d"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("e"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("e"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("f"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("f"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("g"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("g"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("h"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("h"), 10L);
        try (KeyValueIterator iterator = this.cachingStore.backwardAll();){
            String[] array;
            for (String s : array = new String[]{"h", "g", "f", "e", "d", "c", "b", "a"}) {
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(s), (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), s);
            }
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void shouldFetchAllWithinTimestampRange() {
        String str;
        int i;
        String[] array = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"};
        for (int i2 = 0; i2 < array.length; ++i2) {
            this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey(array[i2]), TimeOrderedCachingPersistentWindowStoreTest.bytesValue(array[i2]), (long)i2);
        }
        try (KeyValueIterator iterator = this.cachingStore.fetchAll(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(7L));){
            for (i = 0; i < array.length; ++i) {
                str = array[i];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
            }
            Assert.assertFalse((boolean)iterator.hasNext());
        }
        var3_4 = null;
        try (KeyValueIterator iterator1 = this.cachingStore.fetchAll(Instant.ofEpochMilli(2L), Instant.ofEpochMilli(4L));){
            for (i = 2; i <= 4; ++i) {
                str = array[i];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator1.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
            }
            Assert.assertFalse((boolean)iterator1.hasNext());
        }
        catch (Throwable i3) {
            var3_4 = i3;
            throw i3;
        }
        var3_4 = null;
        try (KeyValueIterator iterator2 = this.cachingStore.fetchAll(Instant.ofEpochMilli(5L), Instant.ofEpochMilli(7L));){
            for (int i4 = 5; i4 <= 7; ++i4) {
                str = array[i4];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator2.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i4, (long)i4 + WINDOW_SIZE)), str);
            }
            Assert.assertFalse((boolean)iterator2.hasNext());
        }
        catch (Throwable throwable) {
            var3_4 = throwable;
            throw throwable;
        }
    }

    @Test
    public void shouldFetchAllBackwardWithinTimestampRange() {
        String str;
        int i;
        String[] array = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"};
        for (int i2 = 0; i2 < array.length; ++i2) {
            this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey(array[i2]), TimeOrderedCachingPersistentWindowStoreTest.bytesValue(array[i2]), (long)i2);
        }
        try (KeyValueIterator iterator = this.cachingStore.backwardFetchAll(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(7L));){
            for (i = array.length - 1; i >= 0; --i) {
                str = array[i];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
            }
            Assert.assertFalse((boolean)iterator.hasNext());
        }
        var3_4 = null;
        try (KeyValueIterator iterator1 = this.cachingStore.backwardFetchAll(Instant.ofEpochMilli(2L), Instant.ofEpochMilli(4L));){
            for (i = 4; i >= 2; --i) {
                str = array[i];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator1.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i, (long)i + WINDOW_SIZE)), str);
            }
            Assert.assertFalse((boolean)iterator1.hasNext());
        }
        catch (Throwable i3) {
            var3_4 = i3;
            throw i3;
        }
        var3_4 = null;
        try (KeyValueIterator iterator2 = this.cachingStore.backwardFetchAll(Instant.ofEpochMilli(5L), Instant.ofEpochMilli(7L));){
            for (int i4 = 7; i4 >= 5; --i4) {
                str = array[i4];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)iterator2.next()), (Windowed<Bytes>)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(str), (Window)new TimeWindow((long)i4, (long)i4 + WINDOW_SIZE)), str);
            }
            Assert.assertFalse((boolean)iterator2.hasNext());
        }
        catch (Throwable throwable) {
            var3_4 = throwable;
            throw throwable;
        }
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
        int added = this.addItemsToCache();
        try (KeyValueIterator iter = this.bytesStore.fetch(Bytes.wrap((byte[])"0".getBytes(StandardCharsets.UTF_8)), 10L, 10L);){
            KeyValue next = (KeyValue)iter.next();
            Assert.assertEquals((long)10L, (long)this.baseKeySchema.segmentTimestamp((Bytes)next.key));
            Assert.assertArrayEquals((byte[])"0".getBytes(), (byte[])((byte[])next.value));
            Assert.assertFalse((boolean)iter.hasNext());
            Assert.assertEquals((long)(added - 1), (long)this.cache.size());
        }
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.flush();
        Assert.assertEquals((Object)"a", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
    }

    @Test
    public void shouldSetFlushListener() {
        Assert.assertTrue((boolean)this.cachingStore.setFlushListener(null, true));
        Assert.assertTrue((boolean)this.cachingStore.setFlushListener(null, false));
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() {
        this.cachingStore.setFlushListener(this.cacheListener, true);
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 10L);
        this.cachingStore.flush();
        Assert.assertEquals((Object)"c", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), null, 10L);
        this.cachingStore.flush();
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertEquals((Object)"c", (Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), null, 10L);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowedKey));
        this.cacheListener.forwarded.clear();
    }

    @Test
    public void shouldNotForwardOldValuesWhenDisabled() {
        Windowed windowedKey = new Windowed((Object)"1", (Window)new TimeWindow(10L, 10L + WINDOW_SIZE));
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.flush();
        Assert.assertEquals((Object)"b", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("c"), 10L);
        this.cachingStore.flush();
        Assert.assertEquals((Object)"c", (Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), null, 10L);
        this.cachingStore.flush();
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).newValue);
        Assert.assertNull((Object)this.cacheListener.forwarded.get((Object)windowedKey).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), null, 10L);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowedKey));
        this.cacheListener.forwarded.clear();
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() {
        int numRecords = this.addItemsToCache();
        Assert.assertEquals((long)numRecords, (long)this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.flush();
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L);
        try (WindowStoreIterator fetch = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L));){
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "b");
            Assert.assertFalse((boolean)fetch.hasNext());
        }
    }

    @Test
    public void shouldIterateAcrossWindows() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        try (WindowStoreIterator fetch = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));){
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "a");
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L + WINDOW_SIZE, "b");
            Assert.assertFalse((boolean)fetch.hasNext());
        }
    }

    @Test
    public void shouldIterateBackwardAcrossWindows() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 10L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        try (WindowStoreIterator fetch = this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));){
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L + WINDOW_SIZE, "b");
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "a");
            Assert.assertFalse((boolean)fetch.hasNext());
        }
    }

    @Test
    public void shouldIterateCacheAndStore() {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.bytesStore.put(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary((Bytes)key, (long)10L, (int)0), "a".getBytes());
        this.cachingStore.put(key, TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        try (WindowStoreIterator fetch = this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));){
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "a");
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L + WINDOW_SIZE, "b");
            Assert.assertFalse((boolean)fetch.hasNext());
        }
    }

    @Test
    public void shouldIterateBackwardCacheAndStore() {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.bytesStore.put(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary((Bytes)key, (long)10L, (int)0), "a".getBytes());
        this.cachingStore.put(key, TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        try (WindowStoreIterator fetch = this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("1"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));){
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L + WINDOW_SIZE, "b");
            this.verifyKeyValue((KeyValue<Long, byte[]>)((KeyValue)fetch.next()), 10L, "a");
            Assert.assertFalse((boolean)fetch.hasNext());
        }
    }

    @Test
    public void shouldIterateCacheAndStoreKeyRange() {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.bytesStore.put(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary((Bytes)key, (long)10L, (int)0), "a".getBytes());
        this.cachingStore.put(key, TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        try (KeyValueIterator fetchRange = this.cachingStore.fetch((Object)key, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("2"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)fetchRange.next()), (Windowed<Bytes>)new Windowed((Object)key, (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), "a");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)fetchRange.next()), (Windowed<Bytes>)new Windowed((Object)key, (Window)new TimeWindow(10L + WINDOW_SIZE, 10L + WINDOW_SIZE + WINDOW_SIZE)), "b");
            Assert.assertFalse((boolean)fetchRange.hasNext());
        }
    }

    @Test
    public void shouldIterateBackwardCacheAndStoreKeyRange() {
        Bytes key = Bytes.wrap((byte[])"1".getBytes());
        this.bytesStore.put(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary((Bytes)key, (long)10L, (int)0), "a".getBytes());
        this.cachingStore.put(key, TimeOrderedCachingPersistentWindowStoreTest.bytesValue("b"), 10L + WINDOW_SIZE);
        try (KeyValueIterator fetchRange = this.cachingStore.backwardFetch((Object)key, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("2"), Instant.ofEpochMilli(10L), Instant.ofEpochMilli(10L + WINDOW_SIZE));){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)fetchRange.next()), (Windowed<Bytes>)new Windowed((Object)key, (Window)new TimeWindow(10L + WINDOW_SIZE, 10L + WINDOW_SIZE + WINDOW_SIZE)), "b");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)fetchRange.next()), (Windowed<Bytes>)new Windowed((Object)key, (Window)new TimeWindow(10L, 10L + WINDOW_SIZE)), "a");
            Assert.assertFalse((boolean)fetchRange.hasNext());
        }
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 0L);
        int size = this.hasIndex ? 2 : 1;
        Assert.assertEquals((long)size, (long)this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L)));
    }

    @Test
    public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("b"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L)));
    }

    @Test
    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("a"), 0L));
    }

    @Test
    public void shouldSkipNonExistBaseKeyInCache() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 0L);
        SegmentedCacheFunction indexCacheFunction = new SegmentedCacheFunction((SegmentedBytesStore.KeySchema)new PrefixedWindowKeySchemas.KeyFirstWindowKeySchema(), 100L);
        Bytes key = TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a");
        byte[] value = TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001");
        Bytes cacheIndexKey = indexCacheFunction.cacheKey(PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.toStoreKeyBinary((Bytes)key, (long)1L, (int)0));
        String cacheName = this.context.taskId() + "-test";
        this.cache.put(cacheName, cacheIndexKey, new LRUCacheEntry(new byte[0], (Headers)new RecordHeaders(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), ""));
        this.underlyingStore.put((Object)key, (Object)value, 1L);
        if (this.hasIndex) {
            StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        } else {
            StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 1L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        }
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0004"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0005"), 100L);
        List expected = Arrays.asList(KeyValue.pair((Object)0L, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001")), KeyValue.pair((Object)1L, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003")), KeyValue.pair((Object)100L, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0005")));
        List actual = StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        StreamsTestUtils.verifyKeyValueList(expected, actual);
    }

    @Test
    public void shouldBackwardFetchAndIterateOverExactKeys() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0004"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0005"), 100L);
        List expected = Arrays.asList(KeyValue.pair((Object)100L, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0005")), KeyValue.pair((Object)1L, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003")), KeyValue.pair((Object)0L, (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001")));
        List actual = StreamsTestUtils.toList(this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        StreamsTestUtils.verifyKeyValueList(expected, actual);
    }

    @Test
    public void shouldFetchAndIterateOverKeyRange() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0004"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0005"), 100L);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0003", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0005", 100L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0004", 1L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        if (this.hasIndex) {
            StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0003", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0004", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0005", 100L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        } else {
            StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0003", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0004", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0005", 100L)), StreamsTestUtils.toList(this.cachingStore.fetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        }
    }

    @Test
    public void shouldFetchAndIterateOverKeyBackwardRange() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0004"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0005"), 100L);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0005", 100L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0003", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0004", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        if (!this.hasIndex) {
            StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0005", 100L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0004", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0003", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        } else {
            StreamsTestUtils.verifyKeyValueList(Arrays.asList(TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0005", 100L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0004", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("aa", "0002", 0L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0003", 1L), TimeOrderedCachingPersistentWindowStoreTest.windowedPair("a", "0001", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        }
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003"), 2L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aaa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0004"), 3L);
        try (WindowStoreIterator singleKeyIterator = this.cachingStore.fetch(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), 0L, 5L);
             KeyValueIterator keyRangeIterator = this.cachingStore.fetch(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), 0L, 5L);){
            Assert.assertEquals((Object)this.stringFrom((byte[])((KeyValue)singleKeyIterator.next()).value), (Object)this.stringFrom((byte[])((KeyValue)keyRangeIterator.next()).value));
            Assert.assertEquals((Object)this.stringFrom((byte[])((KeyValue)singleKeyIterator.next()).value), (Object)this.stringFrom((byte[])((KeyValue)keyRangeIterator.next()).value));
            Assert.assertFalse((boolean)singleKeyIterator.hasNext());
            Assert.assertFalse((boolean)keyRangeIterator.hasNext());
        }
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeBackwardFetch() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0001"), 0L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0002"), 1L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0003"), 2L);
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aaa"), TimeOrderedCachingPersistentWindowStoreTest.bytesValue("0004"), 3L);
        try (WindowStoreIterator singleKeyIterator = this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
             KeyValueIterator keyRangeIterator = this.cachingStore.backwardFetch((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));){
            Assert.assertEquals((Object)this.stringFrom((byte[])((KeyValue)singleKeyIterator.next()).value), (Object)this.stringFrom((byte[])((KeyValue)keyRangeIterator.next()).value));
            Assert.assertEquals((Object)this.stringFrom((byte[])((KeyValue)singleKeyIterator.next()).value), (Object)this.stringFrom((byte[])((KeyValue)keyRangeIterator.next()).value));
            Assert.assertFalse((boolean)singleKeyIterator.hasNext());
            Assert.assertFalse((boolean)keyRangeIterator.hasNext());
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.cachingStore.put(null, TimeOrderedCachingPersistentWindowStoreTest.bytesValue("anyValue"), 0L));
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
        this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey("a"), null, 0L);
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.cachingStore.fetch(null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L)));
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        Bytes keyFrom = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)-1));
        Bytes keyTo = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)1));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TimeOrderedCachingWindowStore.class);
             KeyValueIterator iterator = this.cachingStore.fetch(keyFrom, keyTo, 0L, 10L);){
            Assert.assertFalse((boolean)iterator.hasNext());
            List messages = appender.getMessages();
            MatcherAssert.assertThat((Object)messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
        }
    }

    @Test
    public void shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() {
        Bytes keyFrom = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)-1));
        Bytes keyTo = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)1));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TimeOrderedCachingWindowStore.class);
             KeyValueIterator iterator = this.cachingStore.backwardFetch((Object)keyFrom, (Object)keyTo, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L));){
            Assert.assertFalse((boolean)iterator.hasNext());
            List messages = appender.getMessages();
            MatcherAssert.assertThat((Object)messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
        }
    }

    @Test
    public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on flush"));
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((TimeOrderedCachingWindowStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on close"));
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((TimeOrderedCachingWindowStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseCacheAfterErrorDuringStateStoreClose() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on close"));
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((TimeOrderedCachingWindowStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    private void setUpCloseTests() {
        this.underlyingStore = (WindowStore)EasyMock.createNiceMock(RocksDBTimeOrderedWindowStore.class);
        EasyMock.expect((Object)this.underlyingStore.name()).andStubReturn((Object)"store-name");
        EasyMock.expect((Object)this.underlyingStore.isOpen()).andStubReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        this.cachingStore = new TimeOrderedCachingWindowStore(this.underlyingStore, WINDOW_SIZE.longValue(), 100L);
        this.cache = (ThreadCache)EasyMock.createNiceMock(ThreadCache.class);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, (Headers)new RecordHeaders()));
        this.cachingStore.init((StateStoreContext)this.context, (StateStore)this.cachingStore);
    }

    private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(String key, String value, long timestamp) {
        return KeyValue.pair((Object)new Windowed((Object)TimeOrderedCachingPersistentWindowStoreTest.bytesKey(key), (Window)new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), (Object)TimeOrderedCachingPersistentWindowStoreTest.bytesValue(value));
    }

    private int addItemsToCache() {
        int cachedSize = 0;
        int i = 0;
        while (cachedSize < 300) {
            String kv = String.valueOf(i++);
            this.cachingStore.put(TimeOrderedCachingPersistentWindowStoreTest.bytesKey(kv), TimeOrderedCachingPersistentWindowStoreTest.bytesValue(kv), 10L);
            cachedSize = (int)((long)cachedSize + (ThreadCacheTest.memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), TOPIC) + 8L + 4L));
        }
        return i;
    }
}

