/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
import org.apache.kafka.streams.state.internals.MeteredWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class RocksDBWindowStoreTest {
    private final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
    private final String windowName = "window";
    private final int numSegments = 3;
    private final long segmentSize = 60000L;
    private final long retentionPeriod = 120000L;
    private final long windowSize = 3L;
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Serde<String> stringSerde = Serdes.String();
    private final StateSerdes<Integer, String> serdes = new StateSerdes("", this.intSerde, this.stringSerde);
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;

    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, boolean enableCaching, boolean retainDuplicates) {
        RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier("window", 120000L, 3, retainDuplicates, this.intSerde, this.stringSerde, 3L, true, Collections.emptyMap(), enableCaching);
        WindowStore store = (WindowStore)supplier.get();
        store.init(context, (StateStore)store);
        return store;
    }

    @Test
    public void shouldOnlyIterateOpenSegments() throws Exception {
        File baseDir = TestUtils.tempDirectory();
        MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
        RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments"){

            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
            }
        };
        MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
        WindowStore windowStore = this.createWindowStore((ProcessorContext)context, false, true);
        long currentTime = 0L;
        context.setRecordContext((RecordContext)this.createRecordContext(currentTime));
        windowStore.put((Object)1, (Object)"one");
        context.setRecordContext((RecordContext)this.createRecordContext(currentTime += 60000L));
        windowStore.put((Object)1, (Object)"two");
        context.setRecordContext((RecordContext)this.createRecordContext(currentTime += 60000L));
        windowStore.put((Object)1, (Object)"three");
        WindowStoreIterator iterator = windowStore.fetch((Object)1, 0L, currentTime);
        context.setRecordContext((RecordContext)this.createRecordContext(currentTime += 60000L));
        windowStore.put((Object)1, (Object)"four");
        Assert.assertEquals((Object)new KeyValue((Object)60000L, (Object)"two"), (Object)iterator.next());
        Assert.assertEquals((Object)new KeyValue((Object)120000L, (Object)"three"), (Object)iterator.next());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    private ProcessorRecordContext createRecordContext(long time) {
        return new ProcessorRecordContext(time, 0L, 0, "topic");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndFetch() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestPutAndFetch"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            try (WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);){
                long startTime = 59996L;
                this.putFirstBatch(store, startTime, context);
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero"}), this.toList(store.fetch((Object)0, startTime + 0L - 3L, startTime + 0L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"one"}), this.toList(store.fetch((Object)1, startTime + 1L - 3L, startTime + 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + 2L - 3L, startTime + 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + 3L - 3L, startTime + 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + 4L - 3L, startTime + 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + 5L - 3L, startTime + 5L + 3L)));
                this.putSecondBatch(store, startTime, context);
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime - 2L - 3L, startTime - 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime - 1L - 3L, startTime - 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1"}), this.toList(store.fetch((Object)2, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2"}), this.toList(store.fetch((Object)2, startTime + 1L - 3L, startTime + 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2", "two+3"}), this.toList(store.fetch((Object)2, startTime + 2L - 3L, startTime + 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2", "two+3", "two+4"}), this.toList(store.fetch((Object)2, startTime + 3L - 3L, startTime + 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2", "two+3", "two+4", "two+5"}), this.toList(store.fetch((Object)2, startTime + 4L - 3L, startTime + 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 5L - 3L, startTime + 5L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+1", "two+2", "two+3", "two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 6L - 3L, startTime + 6L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+2", "two+3", "two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 7L - 3L, startTime + 7L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+3", "two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 8L - 3L, startTime + 8L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 9L - 3L, startTime + 9L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 10L - 3L, startTime + 10L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+6"}), this.toList(store.fetch((Object)2, startTime + 11L - 3L, startTime + 11L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 12L - 3L, startTime + 12L + 3L)));
                store.flush();
                Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, startTime);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
                Assert.assertNull(entriesByKey.get(3));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
                Assert.assertNull(entriesByKey.get(6));
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndFetchBefore() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            try (WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);){
                long startTime = 59996L;
                this.putFirstBatch(store, startTime, context);
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero"}), this.toList(store.fetch((Object)0, startTime + 0L - 3L, startTime + 0L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"one"}), this.toList(store.fetch((Object)1, startTime + 1L - 3L, startTime + 1L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + 2L - 3L, startTime + 2L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + 3L - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + 4L - 3L, startTime + 4L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + 5L - 3L, startTime + 5L)));
                this.putSecondBatch(store, startTime, context);
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime - 1L - 3L, startTime - 1L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 0L - 3L, startTime + 0L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 1L - 3L, startTime + 1L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + 2L - 3L, startTime + 2L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1"}), this.toList(store.fetch((Object)2, startTime + 3L - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2"}), this.toList(store.fetch((Object)2, startTime + 4L - 3L, startTime + 4L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2", "two+3"}), this.toList(store.fetch((Object)2, startTime + 5L - 3L, startTime + 5L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+1", "two+2", "two+3", "two+4"}), this.toList(store.fetch((Object)2, startTime + 6L - 3L, startTime + 6L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+2", "two+3", "two+4", "two+5"}), this.toList(store.fetch((Object)2, startTime + 7L - 3L, startTime + 7L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+3", "two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 8L - 3L, startTime + 8L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 9L - 3L, startTime + 9L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 10L - 3L, startTime + 10L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+6"}), this.toList(store.fetch((Object)2, startTime + 11L - 3L, startTime + 11L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 12L - 3L, startTime + 12L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 13L - 3L, startTime + 13L)));
                store.flush();
                Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, startTime);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
                Assert.assertNull(entriesByKey.get(3));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
                Assert.assertNull(entriesByKey.get(6));
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    private void putSecondBatch(WindowStore<Integer, String> store, long startTime, MockProcessorContext context) {
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 3L));
        store.put((Object)2, (Object)"two+1");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 4L));
        store.put((Object)2, (Object)"two+2");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 5L));
        store.put((Object)2, (Object)"two+3");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 6L));
        store.put((Object)2, (Object)"two+4");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 7L));
        store.put((Object)2, (Object)"two+5");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 8L));
        store.put((Object)2, (Object)"two+6");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndFetchAfter() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            try (WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);){
                long startTime = 59996L;
                this.putFirstBatch(store, startTime, context);
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero"}), this.toList(store.fetch((Object)0, startTime + 0L, startTime + 0L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"one"}), this.toList(store.fetch((Object)1, startTime + 1L, startTime + 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + 2L, startTime + 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + 3L, startTime + 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + 4L, startTime + 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + 5L, startTime + 5L + 3L)));
                this.putSecondBatch(store, startTime, context);
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime - 2L, startTime - 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime - 1L, startTime - 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1"}), this.toList(store.fetch((Object)2, startTime, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2"}), this.toList(store.fetch((Object)2, startTime + 1L, startTime + 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two", "two+1", "two+2", "two+3"}), this.toList(store.fetch((Object)2, startTime + 2L, startTime + 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+1", "two+2", "two+3", "two+4"}), this.toList(store.fetch((Object)2, startTime + 3L, startTime + 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+2", "two+3", "two+4", "two+5"}), this.toList(store.fetch((Object)2, startTime + 4L, startTime + 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+3", "two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 5L, startTime + 5L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+4", "two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 6L, startTime + 6L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+5", "two+6"}), this.toList(store.fetch((Object)2, startTime + 7L, startTime + 7L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two+6"}), this.toList(store.fetch((Object)2, startTime + 8L, startTime + 8L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 9L, startTime + 9L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 10L, startTime + 10L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 11L, startTime + 11L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + 12L, startTime + 12L + 3L)));
                store.flush();
                Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, startTime);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
                Assert.assertNull(entriesByKey.get(3));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
                Assert.assertNull(entriesByKey.get(6));
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    private void putFirstBatch(WindowStore<Integer, String> store, long startTime, MockProcessorContext context) {
        context.setRecordContext((RecordContext)this.createRecordContext(startTime));
        store.put((Object)0, (Object)"zero");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 1L));
        store.put((Object)1, (Object)"one");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 2L));
        store.put((Object)2, (Object)"two");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 4L));
        store.put((Object)4, (Object)"four");
        context.setRecordContext((RecordContext)this.createRecordContext(startTime + 5L));
        store.put((Object)5, (Object)"five");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutSameKeyTimestamp() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            try (WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);){
                long startTime = 59996L;
                context.setRecordContext((RecordContext)this.createRecordContext(startTime));
                store.put((Object)0, (Object)"zero");
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero"}), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                store.put((Object)0, (Object)"zero");
                store.put((Object)0, (Object)"zero+");
                store.put((Object)0, (Object)"zero++");
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero", "zero", "zero+", "zero++"}), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero", "zero", "zero+", "zero++"}), this.toList(store.fetch((Object)0, startTime + 1L - 3L, startTime + 1L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero", "zero", "zero+", "zero++"}), this.toList(store.fetch((Object)0, startTime + 2L - 3L, startTime + 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero", "zero", "zero+", "zero++"}), this.toList(store.fetch((Object)0, startTime + 3L - 3L, startTime + 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)0, startTime + 4L - 3L, startTime + 4L + 3L)));
                store.flush();
                Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, startTime);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0", "zero@0", "zero+@0", "zero++@0"}), entriesByKey.get(0));
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCachingEnabled() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList changeLog = new ArrayList();
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "anyTaskID"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            WindowStore store = this.createWindowStore((ProcessorContext)context, true, false);
            Assert.assertTrue((boolean)(store instanceof CachingWindowStore));
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRolling() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList changeLog = new ArrayList();
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestRolling"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);
            RocksDBWindowStore inner = (RocksDBWindowStore)((MeteredWindowStore)store).inner();
            try {
                long startTime = 120000L;
                long incr = 30000L;
                context.setRecordContext((RecordContext)this.createRecordContext(startTime));
                store.put((Object)0, (Object)"zero");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{2L}), (Object)inner.segmentIds());
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr));
                store.put((Object)1, (Object)"one");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{2L}), (Object)inner.segmentIds());
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 2L));
                store.put((Object)2, (Object)"two");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{2L, 3L}), (Object)inner.segmentIds());
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{2L, 3L}), (Object)inner.segmentIds());
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 4L));
                store.put((Object)4, (Object)"four");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{2L, 3L, 4L}), (Object)inner.segmentIds());
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 5L));
                store.put((Object)5, (Object)"five");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{2L, 3L, 4L}), (Object)inner.segmentIds());
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"zero"}), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"one"}), this.toList(store.fetch((Object)1, startTime + incr - 3L, startTime + incr + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 6L));
                store.put((Object)6, (Object)"six");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{3L, 4L, 5L}), (Object)inner.segmentIds());
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)1, startTime + incr - 3L, startTime + incr + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"six"}), this.toList(store.fetch((Object)6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 7L));
                store.put((Object)7, (Object)"seven");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{3L, 4L, 5L}), (Object)inner.segmentIds());
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)1, startTime + incr - 3L, startTime + incr + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"two"}), this.toList(store.fetch((Object)2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"six"}), this.toList(store.fetch((Object)6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"seven"}), this.toList(store.fetch((Object)7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 8L));
                store.put((Object)8, (Object)"eight");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{4L, 5L, 6L}), (Object)inner.segmentIds());
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)1, startTime + incr - 3L, startTime + incr + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"six"}), this.toList(store.fetch((Object)6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"seven"}), this.toList(store.fetch((Object)7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"eight"}), this.toList(store.fetch((Object)8, startTime + incr * 8L - 3L, startTime + incr * 8L + 3L)));
                store.flush();
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)}), this.segmentDirs(baseDir));
            }
            finally {
                store.close();
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestore() throws IOException {
        final ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
        long startTime = 120000L;
        long incr = 30000L;
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestRestore"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            try (WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);){
                context.setRecordContext((RecordContext)this.createRecordContext(startTime));
                store.put((Object)0, (Object)"zero");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr));
                store.put((Object)1, (Object)"one");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 2L));
                store.put((Object)2, (Object)"two");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 3L));
                store.put((Object)3, (Object)"three");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 4L));
                store.put((Object)4, (Object)"four");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 5L));
                store.put((Object)5, (Object)"five");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 6L));
                store.put((Object)6, (Object)"six");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 7L));
                store.put((Object)7, (Object)"seven");
                context.setRecordContext((RecordContext)this.createRecordContext(startTime + incr * 8L));
                store.put((Object)8, (Object)"eight");
                store.flush();
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
        File baseDir2 = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestRestoreII"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                    changeLog.add(new KeyValue((Object)keySerializer.serialize(record.topic(), record.key()), (Object)valueSerializer.serialize(record.topic(), record.value())));
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);
            RocksDBWindowStore inner = (RocksDBWindowStore)((MeteredWindowStore)store).inner();
            try {
                context.restore("window", changeLog);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new Long[]{4L, 5L, 6L}), (Object)inner.segmentIds());
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)0, startTime - 3L, startTime + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)1, startTime + incr - 3L, startTime + incr + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new Object[0]), this.toList(store.fetch((Object)3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"four"}), this.toList(store.fetch((Object)4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"five"}), this.toList(store.fetch((Object)5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"six"}), this.toList(store.fetch((Object)6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"seven"}), this.toList(store.fetch((Object)7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
                Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"eight"}), this.toList(store.fetch((Object)8, startTime + incr * 8L - 3L, startTime + incr * 8L + 3L)));
                store.flush();
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)}), this.segmentDirs(baseDir));
            }
            finally {
                store.close();
            }
        }
        finally {
            Utils.delete((File)baseDir2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSegmentMaintenance() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestSegmentMaintenance"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);
            RocksDBWindowStore inner = (RocksDBWindowStore)((MeteredWindowStore)store).inner();
            try {
                context.setTime(0L);
                context.setRecordContext((RecordContext)this.createRecordContext(0L));
                store.put((Object)0, (Object)"v");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(0L)}), this.segmentDirs(baseDir));
                context.setRecordContext((RecordContext)this.createRecordContext(59999L));
                store.put((Object)0, (Object)"v");
                store.put((Object)0, (Object)"v");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(0L)}), this.segmentDirs(baseDir));
                context.setRecordContext((RecordContext)this.createRecordContext(60000L));
                store.put((Object)0, (Object)"v");
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(0L), inner.segmentName(1L)}), this.segmentDirs(baseDir));
                WindowStoreIterator iter = store.fetch((Object)0, 0L, 240000L);
                int fetchedCount = 0;
                while (iter.hasNext()) {
                    iter.next();
                    ++fetchedCount;
                }
                Assert.assertEquals((long)4L, (long)fetchedCount);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(0L), inner.segmentName(1L)}), this.segmentDirs(baseDir));
                context.setRecordContext((RecordContext)this.createRecordContext(180000L));
                store.put((Object)0, (Object)"v");
                iter = store.fetch((Object)0, 0L, 240000L);
                fetchedCount = 0;
                while (iter.hasNext()) {
                    iter.next();
                    ++fetchedCount;
                }
                Assert.assertEquals((long)2L, (long)fetchedCount);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(1L), inner.segmentName(3L)}), this.segmentDirs(baseDir));
                context.setRecordContext((RecordContext)this.createRecordContext(300000L));
                store.put((Object)0, (Object)"v");
                iter = store.fetch((Object)0, 240000L, 1000000L);
                fetchedCount = 0;
                while (iter.hasNext()) {
                    iter.next();
                    ++fetchedCount;
                }
                Assert.assertEquals((long)1L, (long)fetchedCount);
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(3L), inner.segmentName(5L)}), this.segmentDirs(baseDir));
            }
            finally {
                store.close();
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitialLoading() throws IOException {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
            RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-TestInitialLoading"){

                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                }
            };
            MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
            File storeDir = new File(baseDir, "window");
            WindowStore store = this.createWindowStore((ProcessorContext)context, false, true);
            RocksDBWindowStore inner = (RocksDBWindowStore)((MeteredWindowStore)store).inner();
            try {
                new File(storeDir, inner.segmentName(0L)).mkdir();
                new File(storeDir, inner.segmentName(1L)).mkdir();
                new File(storeDir, inner.segmentName(2L)).mkdir();
                new File(storeDir, inner.segmentName(3L)).mkdir();
                new File(storeDir, inner.segmentName(4L)).mkdir();
                new File(storeDir, inner.segmentName(5L)).mkdir();
                new File(storeDir, inner.segmentName(6L)).mkdir();
            }
            finally {
                store.close();
            }
            store = this.createWindowStore((ProcessorContext)context, false, true);
            inner = (RocksDBWindowStore)((MeteredWindowStore)store).inner();
            try {
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)}), this.segmentDirs(baseDir));
                try (WindowStoreIterator iter = store.fetch((Object)0, 0L, 1000000L);){
                    while (iter.hasNext()) {
                        iter.next();
                    }
                }
                Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)}), this.segmentDirs(baseDir));
            }
            finally {
                store.close();
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    @Test
    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
        File baseDir = TestUtils.tempDirectory();
        MockProducer producer = new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer());
        RecordCollector recordCollector = new RecordCollector((Producer)producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments"){

            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
            }
        };
        MockProcessorContext context = new MockProcessorContext(null, baseDir, this.byteArraySerde, this.byteArraySerde, recordCollector, new ThreadCache(0x100000L));
        WindowStore windowStore = this.createWindowStore((ProcessorContext)context, false, true);
        context.setRecordContext((RecordContext)this.createRecordContext(0L));
        windowStore.put((Object)1, (Object)"one", 1L);
        windowStore.put((Object)1, (Object)"two", 2L);
        windowStore.put((Object)1, (Object)"three", 3L);
        WindowStoreIterator iterator = windowStore.fetch((Object)1, 1L, 3L);
        Assert.assertTrue((boolean)iterator.hasNext());
        windowStore.close();
        try {
            iterator.hasNext();
            Assert.fail((String)"should have thrown InvalidStateStoreException on closed store");
        }
        catch (InvalidStateStoreException e) {
            // empty catch block
        }
        try {
            iterator.next();
            Assert.fail((String)"should have thrown InvalidStateStoreException on closed store");
        }
        catch (InvalidStateStoreException e) {
            // empty catch block
        }
    }

    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
        ArrayList<Object> list = new ArrayList<Object>();
        while (iterator.hasNext()) {
            list.add(((KeyValue)iterator.next()).value);
        }
        return list;
    }

    private Set<String> segmentDirs(File baseDir) {
        File windowDir = new File(baseDir, "window");
        return new HashSet<String>(Arrays.asList(windowDir.list()));
    }

    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
        HashMap<Integer, Set<String>> entriesByKey = new HashMap<Integer, Set<String>>();
        for (KeyValue<byte[], byte[]> entry : changeLog) {
            long timestamp = WindowStoreUtils.timestampFromBinaryKey((byte[])((byte[])entry.key));
            Integer key = (Integer)WindowStoreUtils.keyFromBinaryKey((byte[])((byte[])entry.key), this.serdes);
            String value = entry.value == null ? null : (String)this.serdes.valueFrom((byte[])entry.value);
            Set<String> entries = entriesByKey.get(key);
            if (entries == null) {
                entries = new HashSet<String>();
                entriesByKey.put(key, entries);
            }
            entries.add(value + "@" + (timestamp - startTime));
        }
        return entriesByKey;
    }
}

