/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import kafka.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class QueryableStateIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private String streamOne;
    private String streamTwo;
    private String streamThree;
    private String streamConcurrent;
    private String outputTopic;
    private String outputTopicConcurrent;
    private String outputTopicThree;
    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2L, TimeUnit.DAYS);
    private static final int NUM_PARTITIONS = 2;
    private static final int NUM_REPLICAS = 1;
    private Properties streamsConfiguration;
    private List<String> inputValues;
    private Set<String> inputValuesKeys;
    private KafkaStreams kafkaStreams;
    private Comparator<KeyValue<String, String>> stringComparator;
    private Comparator<KeyValue<String, Long>> stringLongComparator;
    private static int testNo = 0;
    @Parameterized.Parameter
    public long cacheSizeBytes;

    public QueryableStateIntegrationTest() {
        this.mockTime = QueryableStateIntegrationTest.CLUSTER.time;
        this.streamOne = "stream-one";
        this.streamTwo = "stream-two";
        this.streamThree = "stream-three";
        this.streamConcurrent = "stream-concurrent";
        this.outputTopic = "output";
        this.outputTopicConcurrent = "output-concurrent";
        this.outputTopicThree = "output-three";
    }

    public void createTopics() {
        this.streamOne = this.streamOne + "-" + testNo;
        this.streamConcurrent = this.streamConcurrent + "-" + testNo;
        this.streamThree = this.streamThree + "-" + testNo;
        this.outputTopic = this.outputTopic + "-" + testNo;
        this.outputTopicConcurrent = this.outputTopicConcurrent + "-" + testNo;
        this.outputTopicThree = this.outputTopicThree + "-" + testNo;
        this.streamTwo = this.streamTwo + "-" + testNo;
        CLUSTER.createTopic(this.streamOne);
        CLUSTER.createTopic(this.streamConcurrent);
        CLUSTER.createTopic(this.streamTwo, 2, 1);
        CLUSTER.createTopic(this.streamThree, 4, 1);
        CLUSTER.createTopic(this.outputTopic);
        CLUSTER.createTopic(this.outputTopicConcurrent);
        CLUSTER.createTopic(this.outputTopicThree);
    }

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{0, 0xA00000L};
    }

    @Before
    public void before() throws IOException {
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String applicationId = "queryable-state-" + ++testNo;
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("zookeeper.connect", CLUSTER.zKConnectString());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)"qs-test").getPath());
        this.streamsConfiguration.put("key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("commit.interval.ms", (Object)1);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)this.cacheSizeBytes);
        this.stringComparator = new Comparator<KeyValue<String, String>>(){

            @Override
            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
                return ((String)o1.key).compareTo((String)o2.key);
            }
        };
        this.stringLongComparator = new Comparator<KeyValue<String, Long>>(){

            @Override
            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
                return ((String)o1.key).compareTo((String)o2.key);
            }
        };
        this.inputValues = Arrays.asList("hello world", "all streams lead to kafka", "streams", "kafka streams", "the cat in the hat", "green eggs and ham", "that sam i am", "up the creek without a paddle", "run forest run", "a tank full of gas", "eat sleep rave repeat", "one jolly sailor", "king of the world");
        this.inputValuesKeys = new HashSet<String>();
        for (String sentence : this.inputValues) {
            String[] words;
            for (String word : words = sentence.split("\\W+")) {
                this.inputValuesKeys.add(word);
            }
        }
    }

    @After
    public void shutdown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    private KafkaStreams createCountStream(String inputTopic, String outputTopic, Properties streamsConfiguration) {
        KStreamBuilder builder = new KStreamBuilder();
        Serde stringSerde = Serdes.String();
        KStream textLines = builder.stream(stringSerde, stringSerde, new String[]{inputTopic});
        KGroupedStream groupedByWord = textLines.flatMapValues((ValueMapper)new ValueMapper<String, Iterable<String>>(){

            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockKeyValueMapper.SelectValueMapper());
        groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);
        groupedByWord.count((Windows)TimeWindows.of((long)WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
        return new KafkaStreams((TopologyBuilder)builder, streamsConfiguration);
    }

    private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams, Set<String> keys, final String storeName) throws Exception {
        for (final String key : keys) {
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    try {
                        StreamsMetadata metadata = streams.metadataForKey(storeName, (Object)key, (Serializer)new StringSerializer());
                        if (metadata == null || metadata.equals((Object)StreamsMetadata.NOT_AVAILABLE)) {
                            return false;
                        }
                        int index = metadata.hostInfo().port();
                        KafkaStreams streamsWithKey = streamRunnables[index].getStream();
                        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());
                        return store != null && store.get((Object)key) != null;
                    }
                    catch (IllegalStateException e) {
                        return false;
                    }
                    catch (InvalidStateStoreException e) {
                        return false;
                    }
                }
            }, (long)30000L, (String)"waiting for metadata, store and value to be non null");
        }
    }

    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams, Set<String> keys, final String storeName, final Long from, final Long to) throws Exception {
        for (final String key : keys) {
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    try {
                        StreamsMetadata metadata = streams.metadataForKey(storeName, (Object)key, (Serializer)new StringSerializer());
                        if (metadata == null || metadata.equals((Object)StreamsMetadata.NOT_AVAILABLE)) {
                            return false;
                        }
                        int index = metadata.hostInfo().port();
                        KafkaStreams streamsWithKey = streamRunnables[index].getStream();
                        ReadOnlyWindowStore store = (ReadOnlyWindowStore)streamsWithKey.store(storeName, QueryableStoreTypes.windowStore());
                        return store != null && store.fetch((Object)key, from.longValue(), to.longValue()) != null;
                    }
                    catch (IllegalStateException e) {
                        return false;
                    }
                    catch (InvalidStateStoreException e) {
                        return false;
                    }
                }
            }, (long)30000L, (String)"waiting for metadata, store and value to be non null");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void queryOnRebalance() throws Exception {
        int i;
        int numThreads = 2;
        StreamRunnable[] streamRunnables = new StreamRunnable[2];
        Thread[] streamThreads = new Thread[2];
        int numIterations = 500000;
        ProducerRunnable producerRunnable = new ProducerRunnable(this.streamThree, this.inputValues, 500000);
        Thread producerThread = new Thread(producerRunnable);
        for (i = 0; i < 2; ++i) {
            streamRunnables[i] = new StreamRunnable(this.streamThree, this.outputTopicThree, i);
            streamThreads[i] = new Thread(streamRunnables[i]);
            streamThreads[i].start();
        }
        producerThread.start();
        try {
            this.waitUntilAtLeastNumRecordProcessed(this.outputTopicThree, 1);
            for (i = 0; i < 2; ++i) {
                this.verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), this.inputValuesKeys, "word-count-store-" + this.streamThree);
                this.verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, WINDOW_SIZE);
            }
            for (i = 1; i < 2; ++i) {
                streamRunnables[i].close();
                streamThreads[i].interrupt();
                streamThreads[i].join();
            }
            this.verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), this.inputValuesKeys, "word-count-store-" + this.streamThree);
            this.verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, WINDOW_SIZE);
        }
        finally {
            for (i = 0; i < 2; ++i) {
                if (streamRunnables[i].isClosed()) continue;
                streamRunnables[i].close();
                streamThreads[i].interrupt();
                streamThreads[i].join();
            }
            producerRunnable.shutdown();
            producerThread.interrupt();
            producerThread.join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void concurrentAccesses() throws Exception {
        int numIterations = 500000;
        ProducerRunnable producerRunnable = new ProducerRunnable(this.streamConcurrent, this.inputValues, 500000);
        Thread producerThread = new Thread(producerRunnable);
        this.kafkaStreams = this.createCountStream(this.streamConcurrent, this.outputTopicConcurrent, this.streamsConfiguration);
        this.kafkaStreams.start();
        producerThread.start();
        try {
            this.waitUntilAtLeastNumRecordProcessed(this.outputTopicConcurrent, 1);
            ReadOnlyKeyValueStore keyValueStore = (ReadOnlyKeyValueStore)this.kafkaStreams.store("word-count-store-" + this.streamConcurrent, QueryableStoreTypes.keyValueStore());
            ReadOnlyWindowStore windowStore = (ReadOnlyWindowStore)this.kafkaStreams.store("windowed-word-count-store-" + this.streamConcurrent, QueryableStoreTypes.windowStore());
            HashMap<String, Long> expectedWindowState = new HashMap<String, Long>();
            HashMap<String, Long> expectedCount = new HashMap<String, Long>();
            while (producerRunnable.getCurrIteration() < 500000) {
                this.verifyGreaterOrEqual(this.inputValuesKeys.toArray(new String[this.inputValuesKeys.size()]), expectedWindowState, expectedCount, (ReadOnlyWindowStore<String, Long>)windowStore, (ReadOnlyKeyValueStore<String, Long>)keyValueStore, false);
            }
            this.verifyGreaterOrEqual(this.inputValuesKeys.toArray(new String[this.inputValuesKeys.size()]), expectedWindowState, expectedCount, (ReadOnlyWindowStore<String, Long>)windowStore, (ReadOnlyKeyValueStore<String, Long>)keyValueStore, true);
        }
        finally {
            producerRunnable.shutdown();
            producerThread.interrupt();
            producerThread.join();
        }
    }

    @Test
    public void shouldBeAbleToQueryState() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String[] keys = new String[]{"hello", "goodbye", "welcome", "go", "kafka"};
        TreeSet batch1 = new TreeSet(this.stringComparator);
        batch1.addAll(Arrays.asList(new KeyValue((Object)keys[0], (Object)"hello"), new KeyValue((Object)keys[1], (Object)"goodbye"), new KeyValue((Object)keys[2], (Object)"welcome"), new KeyValue((Object)keys[3], (Object)"go"), new KeyValue((Object)keys[4], (Object)"kafka")));
        TreeSet<KeyValue<String, Long>> expectedCount = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        for (String key : keys) {
            expectedCount.add((KeyValue<String, Long>)new KeyValue((Object)key, (Object)1L));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, batch1, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        KStream s1 = builder.stream(new String[]{this.streamOne});
        s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), this.outputTopic);
        s1.groupByKey().count((Windows)TimeWindows.of((long)WINDOW_SIZE), "windowed-count");
        this.kafkaStreams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        this.kafkaStreams.start();
        this.waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore myCount = (ReadOnlyKeyValueStore)this.kafkaStreams.store("my-count", QueryableStoreTypes.keyValueStore());
        ReadOnlyWindowStore windowStore = (ReadOnlyWindowStore)this.kafkaStreams.store("windowed-count", QueryableStoreTypes.windowStore());
        this.verifyCanGetByKey(keys, expectedCount, expectedCount, (ReadOnlyWindowStore<String, Long>)windowStore, (ReadOnlyKeyValueStore<String, Long>)myCount);
        this.verifyRangeAndAll(expectedCount, (ReadOnlyKeyValueStore<String, Long>)myCount);
    }

    @Test
    public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        KStream stream = builder.stream(new String[]{this.streamThree});
        String storeName = "count-by-key";
        stream.groupByKey().count("count-by-key");
        this.kafkaStreams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        this.kafkaStreams.start();
        KeyValue hello = KeyValue.pair((Object)"hello", (Object)"hello");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamThree, Arrays.asList(hello, hello, hello, hello, hello, hello, hello, hello), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        int maxWaitMs = 30000;
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                try {
                    QueryableStateIntegrationTest.this.kafkaStreams.store("count-by-key", QueryableStoreTypes.keyValueStore());
                    return true;
                }
                catch (InvalidStateStoreException ise) {
                    return false;
                }
            }
        }, (long)30000L, (String)"waiting for store count-by-key");
        final ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)this.kafkaStreams.store("count-by-key", QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return new Long(8L).equals(store.get((Object)"hello"));
            }
        }, (long)30000L, (String)"wait for count to be 8");
        this.kafkaStreams.close();
        this.kafkaStreams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        this.kafkaStreams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                try {
                    Assert.assertEquals((Object)8L, (Object)((ReadOnlyKeyValueStore)QueryableStateIntegrationTest.this.kafkaStreams.store("count-by-key", QueryableStoreTypes.keyValueStore())).get((Object)"hello"));
                    return true;
                }
                catch (InvalidStateStoreException ise) {
                    return false;
                }
            }
        }, (long)30000L, (String)"waiting for store count-by-key");
    }

    private void verifyRangeAndAll(Set<KeyValue<String, Long>> expectedCount, ReadOnlyKeyValueStore<String, Long> myCount) {
        TreeSet<KeyValue<String, Long>> countRangeResults = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        TreeSet<KeyValue<String, Long>> countAllResults = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        TreeSet<KeyValue<String, Long>> expectedRangeResults = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        expectedRangeResults.addAll(Arrays.asList(new KeyValue((Object)"hello", (Object)1L), new KeyValue((Object)"go", (Object)1L), new KeyValue((Object)"goodbye", (Object)1L), new KeyValue((Object)"kafka", (Object)1L)));
        try (KeyValueIterator range = myCount.range((Object)"go", (Object)"kafka");){
            while (range.hasNext()) {
                countRangeResults.add((KeyValue<String, Long>)range.next());
            }
        }
        var7_7 = null;
        try (KeyValueIterator all = myCount.all();){
            while (all.hasNext()) {
                countAllResults.add((KeyValue<String, Long>)all.next());
            }
        }
        catch (Throwable throwable) {
            var7_7 = throwable;
            throw throwable;
        }
        MatcherAssert.assertThat(countRangeResults, (Matcher)IsEqual.equalTo(expectedRangeResults));
        MatcherAssert.assertThat(countAllResults, (Matcher)IsEqual.equalTo(expectedCount));
    }

    private void verifyCanGetByKey(String[] keys, Set<KeyValue<String, Long>> expectedWindowState, Set<KeyValue<String, Long>> expectedCount, ReadOnlyWindowStore<String, Long> windowStore, ReadOnlyKeyValueStore<String, Long> myCount) throws InterruptedException {
        TreeSet<KeyValue<String, Long>> windowState = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        TreeSet<KeyValue<String, Long>> countState = new TreeSet<KeyValue<String, Long>>(this.stringLongComparator);
        long timeout = System.currentTimeMillis() + 30000L;
        while ((windowState.size() < keys.length || countState.size() < keys.length) && System.currentTimeMillis() < timeout) {
            Thread.sleep(10L);
            for (String key : keys) {
                windowState.addAll(this.fetch(windowStore, key));
                Long value = (Long)myCount.get((Object)key);
                if (value == null) continue;
                countState.add((KeyValue<String, Long>)new KeyValue((Object)key, (Object)value));
            }
        }
        MatcherAssert.assertThat(windowState, (Matcher)IsEqual.equalTo(expectedWindowState));
        MatcherAssert.assertThat(countState, (Matcher)IsEqual.equalTo(expectedCount));
    }

    private void verifyGreaterOrEqual(String[] keys, Map<String, Long> expectedWindowedCount, Map<String, Long> expectedCount, ReadOnlyWindowStore<String, Long> windowStore, ReadOnlyKeyValueStore<String, Long> keyValueStore, boolean failIfKeyNotFound) throws InterruptedException {
        HashMap<String, Long> windowState = new HashMap<String, Long>();
        HashMap<String, Long> countState = new HashMap<String, Long>();
        for (String key : keys) {
            Map<String, Long> map = this.fetchMap(windowStore, key);
            if (map.equals(Collections.emptyMap()) && failIfKeyNotFound) {
                Assert.fail((String)("Key not found " + key));
            }
            windowState.putAll(map);
            Long value = (Long)keyValueStore.get((Object)key);
            if (value != null) {
                countState.put(key, value);
                continue;
            }
            if (!failIfKeyNotFound) continue;
            Assert.fail((String)("Key not found " + key));
        }
        for (Map.Entry actualWindowStateEntry : windowState.entrySet()) {
            if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) {
                Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey());
                Assert.assertTrue(((Long)actualWindowStateEntry.getValue() >= expectedValue ? 1 : 0) != 0);
            }
            expectedWindowedCount.put((String)actualWindowStateEntry.getKey(), (Long)actualWindowStateEntry.getValue());
        }
        for (Map.Entry actualCountStateEntry : countState.entrySet()) {
            if (expectedCount.containsKey(actualCountStateEntry.getKey())) {
                Long expectedValue = expectedCount.get(actualCountStateEntry.getKey());
                Assert.assertTrue(((Long)actualCountStateEntry.getValue() >= expectedValue ? 1 : 0) != 0);
            }
            expectedCount.put((String)actualCountStateEntry.getKey(), (Long)actualCountStateEntry.getValue());
        }
    }

    private void waitUntilAtLeastNumRecordProcessed(String topic, int numRecs) throws InterruptedException {
        Properties config = new Properties();
        config.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        config.setProperty("group.id", "queryable-state-consumer");
        config.setProperty("auto.offset.reset", "earliest");
        config.setProperty("key.deserializer", StringDeserializer.class.getName());
        config.setProperty("value.deserializer", LongDeserializer.class.getName());
        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, topic, numRecs, 60000L);
    }

    private Set<KeyValue<String, Long>> fetch(ReadOnlyWindowStore<String, Long> store, String key) {
        WindowStoreIterator fetch = store.fetch((Object)key, 0L, System.currentTimeMillis());
        if (fetch.hasNext()) {
            KeyValue next = (KeyValue)fetch.next();
            return Collections.singleton(KeyValue.pair((Object)key, (Object)next.value));
        }
        return Collections.emptySet();
    }

    private Map<String, Long> fetchMap(ReadOnlyWindowStore<String, Long> store, String key) {
        WindowStoreIterator fetch = store.fetch((Object)key, 0L, System.currentTimeMillis());
        if (fetch.hasNext()) {
            KeyValue next = (KeyValue)fetch.next();
            return Collections.singletonMap(key, next.value);
        }
        return Collections.emptyMap();
    }

    private class ProducerRunnable
    implements Runnable {
        private final String topic;
        private final List<String> inputValues;
        private final int numIterations;
        private int currIteration = 0;
        boolean shutdown = false;

        ProducerRunnable(String topic, List<String> inputValues, int numIterations) {
            this.topic = topic;
            this.inputValues = inputValues;
            this.numIterations = numIterations;
        }

        private synchronized void incrementInteration() {
            ++this.currIteration;
        }

        public synchronized int getCurrIteration() {
            return this.currIteration;
        }

        public synchronized void shutdown() {
            this.shutdown = true;
        }

        @Override
        public void run() {
            Properties producerConfig = new Properties();
            producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
            producerConfig.put("acks", "all");
            producerConfig.put("retries", (Object)0);
            producerConfig.put("key.serializer", StringSerializer.class);
            producerConfig.put("value.serializer", StringSerializer.class);
            KafkaProducer producer = new KafkaProducer(producerConfig, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            while (this.getCurrIteration() < this.numIterations && !this.shutdown) {
                for (int i = 0; i < this.inputValues.size(); ++i) {
                    producer.send(new ProducerRecord(this.topic, (Object)this.inputValues.get(i)));
                }
                this.incrementInteration();
            }
        }
    }

    private class StreamRunnable
    implements Runnable {
        private final KafkaStreams myStream;
        private boolean closed = false;

        StreamRunnable(String inputTopic, String outputTopic, int queryPort) {
            Properties props = (Properties)QueryableStateIntegrationTest.this.streamsConfiguration.clone();
            props.put("application.server", "localhost:" + queryPort);
            this.myStream = QueryableStateIntegrationTest.this.createCountStream(inputTopic, outputTopic, props);
        }

        @Override
        public void run() {
            this.myStream.start();
        }

        public void close() {
            if (!this.closed) {
                this.myStream.close();
                this.closed = true;
            }
        }

        public boolean isClosed() {
            return this.closed;
        }

        public final KafkaStreams getStream() {
            return this.myStream;
        }
    }
}

