package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/integration/QueryableStateIntegrationTest.class */
public class QueryableStateIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final int STREAM_THREE_PARTITIONS = 4;
    private final MockTime mockTime = CLUSTER.time;
    private String streamOne = "stream-one";
    private String streamTwo = "stream-two";
    private String streamThree = "stream-three";
    private String streamConcurrent = "stream-concurrent";
    private String outputTopic = "output";
    private String outputTopicConcurrent = "output-concurrent";
    private String outputTopicThree = "output-three";
    private static final int STREAM_TWO_PARTITIONS = 2;
    private static final int NUM_REPLICAS = 1;
    private Properties streamsConfiguration;
    private List<String> inputValues;
    private Set<String> inputValuesKeys;
    private KafkaStreams kafkaStreams;
    private Comparator<KeyValue<String, String>> stringComparator;
    private Comparator<KeyValue<String, Long>> stringLongComparator;

    @Parameterized.Parameter
    public long cacheSizeBytes;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
    private static int testNo = 0;

    /* loaded from: input_file:org/apache/kafka/streams/integration/QueryableStateIntegrationTest$ProducerRunnable.class */
    private class ProducerRunnable implements Runnable {
        private final String topic;
        private final List<String> inputValues;
        private final int numIterations;
        private int currIteration = 0;
        boolean shutdown = false;

        ProducerRunnable(String str, List<String> list, int i) {
            this.topic = str;
            this.inputValues = list;
            this.numIterations = i;
        }

        private synchronized void incrementInteration() {
            this.currIteration++;
        }

        public synchronized int getCurrIteration() {
            return this.currIteration;
        }

        public synchronized void shutdown() {
            this.shutdown = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", QueryableStateIntegrationTest.CLUSTER.bootstrapServers());
            properties.put("acks", "all");
            properties.put("retries", 0);
            properties.put("key.serializer", StringSerializer.class);
            properties.put("value.serializer", StringSerializer.class);
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            while (getCurrIteration() < this.numIterations && !this.shutdown) {
                for (int i = 0; i < this.inputValues.size(); i++) {
                    kafkaProducer.send(new ProducerRecord(this.topic, this.inputValues.get(i)));
                }
                incrementInteration();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/QueryableStateIntegrationTest$StreamRunnable.class */
    public class StreamRunnable implements Runnable {
        private final KafkaStreams myStream;
        private boolean closed = false;
        private KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();

        StreamRunnable(String str, String str2, int i) {
            Properties properties = (Properties) QueryableStateIntegrationTest.this.streamsConfiguration.clone();
            properties.put("application.server", "localhost:" + i);
            this.myStream = QueryableStateIntegrationTest.this.createCountStream(str, str2, properties);
            this.myStream.setStateListener(this.stateListener);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.myStream.start();
        }

        public void close() {
            if (this.closed) {
                return;
            }
            this.myStream.close();
            this.closed = true;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public final KafkaStreams getStream() {
            return this.myStream;
        }

        public final KafkaStreamsTest.StateListenerStub getStateListener() {
            return this.stateListener;
        }
    }

    public void createTopics() throws InterruptedException {
        this.streamOne += "-" + testNo;
        this.streamConcurrent += "-" + testNo;
        this.streamThree += "-" + testNo;
        this.outputTopic += "-" + testNo;
        this.outputTopicConcurrent += "-" + testNo;
        this.outputTopicThree += "-" + testNo;
        this.streamTwo += "-" + testNo;
        CLUSTER.createTopic(this.streamOne);
        CLUSTER.createTopic(this.streamConcurrent);
        CLUSTER.createTopic(this.streamTwo, STREAM_TWO_PARTITIONS, 1);
        CLUSTER.createTopic(this.streamThree, 4, 1);
        CLUSTER.createTopic(this.outputTopic);
        CLUSTER.createTopic(this.outputTopicConcurrent);
        CLUSTER.createTopic(this.outputTopicThree);
    }

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{0, 10485760L};
    }

    @Before
    public void before() throws IOException, InterruptedException {
        testNo++;
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "queryable-state-" + testNo);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory("qs-test").getPath());
        this.streamsConfiguration.put("key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("cache.max.bytes.buffering", Long.valueOf(this.cacheSizeBytes));
        this.streamsConfiguration.put("commit.interval.ms", 1000);
        this.stringComparator = new Comparator<KeyValue<String, String>>() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.1
            @Override // java.util.Comparator
            public int compare(KeyValue<String, String> keyValue, KeyValue<String, String> keyValue2) {
                return ((String) keyValue.key).compareTo((String) keyValue2.key);
            }
        };
        this.stringLongComparator = new Comparator<KeyValue<String, Long>>() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.2
            @Override // java.util.Comparator
            public int compare(KeyValue<String, Long> keyValue, KeyValue<String, Long> keyValue2) {
                return ((String) keyValue.key).compareTo((String) keyValue2.key);
            }
        };
        this.inputValues = Arrays.asList("hello world", "all streams lead to kafka", "streams", "kafka streams", "the cat in the hat", "green eggs and ham", "that sam i am", "up the creek without a paddle", "run forest run", "a tank full of gas", "eat sleep rave repeat", "one jolly sailor", "king of the world");
        this.inputValuesKeys = new HashSet();
        Iterator<String> it = this.inputValues.iterator();
        while (it.hasNext()) {
            for (String str : it.next().split("\\W+")) {
                this.inputValuesKeys.add(str);
            }
        }
    }

    @After
    public void shutdown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KafkaStreams createCountStream(String str, String str2, Properties properties) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        Serde String = Serdes.String();
        KGroupedStream groupBy = kStreamBuilder.stream(String, String, new String[]{str}).flatMapValues(new ValueMapper<String, Iterable<String>>() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.3
            public Iterable<String> apply(String str3) {
                return Arrays.asList(str3.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockKeyValueMapper.SelectValueMapper());
        groupBy.count("word-count-store-" + str).to(Serdes.String(), Serdes.Long(), str2);
        groupBy.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + str);
        return new KafkaStreams(kStreamBuilder, properties);
    }

    private void verifyAllKVKeys(final StreamRunnable[] streamRunnableArr, final KafkaStreams kafkaStreams, final KafkaStreamsTest.StateListenerStub stateListenerStub, Set<String> set, final String str) throws Exception {
        for (final String str2 : set) {
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.4
                public boolean conditionMet() {
                    try {
                        StreamsMetadata metadataForKey = kafkaStreams.metadataForKey(str, str2, new StringSerializer());
                        if (metadataForKey == null || metadataForKey.equals(StreamsMetadata.NOT_AVAILABLE)) {
                            return false;
                        }
                        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) streamRunnableArr[metadataForKey.hostInfo().port()].getStream().store(str, QueryableStoreTypes.keyValueStore());
                        if (readOnlyKeyValueStore != null) {
                            if (readOnlyKeyValueStore.get(str2) != null) {
                                return true;
                            }
                        }
                        return false;
                    } catch (IllegalStateException e) {
                        return false;
                    } catch (InvalidStateStoreException e2) {
                        Assert.assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING).longValue() >= 1);
                        return false;
                    }
                }
            }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for metadata, store and value to be non null");
        }
    }

    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnableArr, final KafkaStreams kafkaStreams, final KafkaStreamsTest.StateListenerStub stateListenerStub, Set<String> set, final String str, final Long l, final Long l2) throws Exception {
        for (final String str2 : set) {
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.5
                public boolean conditionMet() {
                    try {
                        StreamsMetadata metadataForKey = kafkaStreams.metadataForKey(str, str2, new StringSerializer());
                        if (metadataForKey == null || metadataForKey.equals(StreamsMetadata.NOT_AVAILABLE)) {
                            return false;
                        }
                        ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) streamRunnableArr[metadataForKey.hostInfo().port()].getStream().store(str, QueryableStoreTypes.windowStore());
                        if (readOnlyWindowStore != null) {
                            if (readOnlyWindowStore.fetch(str2, l.longValue(), l2.longValue()) != null) {
                                return true;
                            }
                        }
                        return false;
                    } catch (IllegalStateException e) {
                        return false;
                    } catch (InvalidStateStoreException e2) {
                        Assert.assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING).longValue() >= 1);
                        return false;
                    }
                }
            }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for metadata, store and value to be non null");
        }
    }

    @Test
    public void queryOnRebalance() throws Exception {
        StreamRunnable[] streamRunnableArr = new StreamRunnable[STREAM_TWO_PARTITIONS];
        Thread[] threadArr = new Thread[STREAM_TWO_PARTITIONS];
        ProducerRunnable producerRunnable = new ProducerRunnable(this.streamThree, this.inputValues, 500000);
        Thread thread = new Thread(producerRunnable);
        for (int i = 0; i < STREAM_TWO_PARTITIONS; i++) {
            streamRunnableArr[i] = new StreamRunnable(this.streamThree, this.outputTopicThree, i);
            threadArr[i] = new Thread(streamRunnableArr[i]);
            threadArr[i].start();
        }
        thread.start();
        try {
            waitUntilAtLeastNumRecordProcessed(this.outputTopicThree, 1);
            for (int i2 = 0; i2 < STREAM_TWO_PARTITIONS; i2++) {
                verifyAllKVKeys(streamRunnableArr, streamRunnableArr[i2].getStream(), streamRunnableArr[i2].getStateListener(), this.inputValuesKeys, "word-count-store-" + this.streamThree);
                verifyAllWindowedKeys(streamRunnableArr, streamRunnableArr[i2].getStream(), streamRunnableArr[i2].getStateListener(), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, Long.valueOf(WINDOW_SIZE));
                Assert.assertEquals(streamRunnableArr[i2].getStream().state(), KafkaStreams.State.RUNNING);
            }
            for (int i3 = 1; i3 < STREAM_TWO_PARTITIONS; i3++) {
                streamRunnableArr[i3].close();
                threadArr[i3].interrupt();
                threadArr[i3].join();
            }
            verifyAllKVKeys(streamRunnableArr, streamRunnableArr[0].getStream(), streamRunnableArr[0].getStateListener(), this.inputValuesKeys, "word-count-store-" + this.streamThree);
            verifyAllWindowedKeys(streamRunnableArr, streamRunnableArr[0].getStream(), streamRunnableArr[0].getStateListener(), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, Long.valueOf(WINDOW_SIZE));
            Assert.assertEquals(streamRunnableArr[0].getStream().state(), KafkaStreams.State.RUNNING);
            for (int i4 = 0; i4 < STREAM_TWO_PARTITIONS; i4++) {
                if (!streamRunnableArr[i4].isClosed()) {
                    streamRunnableArr[i4].close();
                    threadArr[i4].interrupt();
                    threadArr[i4].join();
                }
            }
            producerRunnable.shutdown();
            thread.interrupt();
            thread.join();
        } catch (Throwable th) {
            for (int i5 = 0; i5 < STREAM_TWO_PARTITIONS; i5++) {
                if (!streamRunnableArr[i5].isClosed()) {
                    streamRunnableArr[i5].close();
                    threadArr[i5].interrupt();
                    threadArr[i5].join();
                }
            }
            producerRunnable.shutdown();
            thread.interrupt();
            thread.join();
            throw th;
        }
    }

    @Test
    public void concurrentAccesses() throws Exception {
        ProducerRunnable producerRunnable = new ProducerRunnable(this.streamConcurrent, this.inputValues, 500000);
        Thread thread = new Thread(producerRunnable);
        this.kafkaStreams = createCountStream(this.streamConcurrent, this.outputTopicConcurrent, this.streamsConfiguration);
        this.kafkaStreams.start();
        thread.start();
        try {
            waitUntilAtLeastNumRecordProcessed(this.outputTopicConcurrent, 1);
            ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.kafkaStreams.store("word-count-store-" + this.streamConcurrent, QueryableStoreTypes.keyValueStore());
            ReadOnlyWindowStore<String, Long> readOnlyWindowStore = (ReadOnlyWindowStore) this.kafkaStreams.store("windowed-word-count-store-" + this.streamConcurrent, QueryableStoreTypes.windowStore());
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            while (producerRunnable.getCurrIteration() < 500000) {
                verifyGreaterOrEqual((String[]) this.inputValuesKeys.toArray(new String[this.inputValuesKeys.size()]), hashMap, hashMap2, readOnlyWindowStore, readOnlyKeyValueStore, false);
            }
            verifyGreaterOrEqual((String[]) this.inputValuesKeys.toArray(new String[this.inputValuesKeys.size()]), hashMap, hashMap2, readOnlyWindowStore, readOnlyKeyValueStore, true);
            producerRunnable.shutdown();
            thread.interrupt();
            thread.join();
        } catch (Throwable th) {
            producerRunnable.shutdown();
            thread.interrupt();
            thread.join();
            throw th;
        }
    }

    @Test
    public void shouldBeAbleToQueryState() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        String[] strArr = {"hello", "goodbye", "welcome", "go", "kafka"};
        TreeSet treeSet = new TreeSet(this.stringComparator);
        treeSet.addAll(Arrays.asList(new KeyValue(strArr[0], "hello"), new KeyValue(strArr[1], "goodbye"), new KeyValue(strArr[STREAM_TWO_PARTITIONS], "welcome"), new KeyValue(strArr[3], "go"), new KeyValue(strArr[4], "kafka")));
        TreeSet treeSet2 = new TreeSet(this.stringLongComparator);
        for (String str : strArr) {
            treeSet2.add(new KeyValue<>(str, 1L));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, treeSet, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        KStream stream = kStreamBuilder.stream(new String[]{this.streamOne});
        stream.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), this.outputTopic);
        stream.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
        this.kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        this.kafkaStreams.start();
        waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.kafkaStreams.store("my-count", QueryableStoreTypes.keyValueStore());
        verifyCanGetByKey(strArr, treeSet2, treeSet2, (ReadOnlyWindowStore) this.kafkaStreams.store("windowed-count", QueryableStoreTypes.windowStore()), readOnlyKeyValueStore);
        verifyRangeAndAll(treeSet2, readOnlyKeyValueStore);
    }

    @Test
    public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{this.streamThree}).groupByKey().count("count-by-key");
        this.kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        this.kafkaStreams.start();
        KeyValue pair = KeyValue.pair("hello", "hello");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamThree, Arrays.asList(pair, pair, pair, pair, pair, pair, pair, pair), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.6
            public boolean conditionMet() {
                try {
                    QueryableStateIntegrationTest.this.kafkaStreams.store("count-by-key", QueryableStoreTypes.keyValueStore());
                    return true;
                } catch (InvalidStateStoreException e) {
                    return false;
                }
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for store count-by-key");
        final ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) this.kafkaStreams.store("count-by-key", QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.7
            public boolean conditionMet() {
                return new Long(8L).equals(readOnlyKeyValueStore.get("hello"));
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "wait for count to be 8");
        this.kafkaStreams.close();
        this.kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.8
            public boolean conditionMet() {
                try {
                    Assert.assertEquals(8L, ((ReadOnlyKeyValueStore) QueryableStateIntegrationTest.this.kafkaStreams.store("count-by-key", QueryableStoreTypes.keyValueStore())).get("hello"));
                    return true;
                } catch (InvalidStateStoreException e) {
                    return false;
                }
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "waiting for store count-by-key");
    }

    private void verifyRangeAndAll(Set<KeyValue<String, Long>> set, ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore) {
        TreeSet treeSet = new TreeSet(this.stringLongComparator);
        TreeSet treeSet2 = new TreeSet(this.stringLongComparator);
        TreeSet treeSet3 = new TreeSet(this.stringLongComparator);
        treeSet3.addAll(Arrays.asList(new KeyValue("hello", 1L), new KeyValue("go", 1L), new KeyValue("goodbye", 1L), new KeyValue("kafka", 1L)));
        KeyValueIterator range = readOnlyKeyValueStore.range("go", "kafka");
        Throwable th = null;
        while (range.hasNext()) {
            try {
                try {
                    treeSet.add(range.next());
                } catch (Throwable th2) {
                    if (range != null) {
                        if (th != null) {
                            try {
                                range.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            range.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (range != null) {
            if (0 != 0) {
                try {
                    range.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                range.close();
            }
        }
        KeyValueIterator all = readOnlyKeyValueStore.all();
        Throwable th5 = null;
        while (all.hasNext()) {
            try {
                try {
                    treeSet2.add(all.next());
                } finally {
                }
            } catch (Throwable th6) {
                if (all != null) {
                    if (th5 != null) {
                        try {
                            all.close();
                        } catch (Throwable th7) {
                            th5.addSuppressed(th7);
                        }
                    } else {
                        all.close();
                    }
                }
                throw th6;
            }
        }
        if (all != null) {
            if (0 != 0) {
                try {
                    all.close();
                } catch (Throwable th8) {
                    th5.addSuppressed(th8);
                }
            } else {
                all.close();
            }
        }
        MatcherAssert.assertThat(treeSet, IsEqual.equalTo(treeSet3));
        MatcherAssert.assertThat(treeSet2, IsEqual.equalTo(set));
    }

    private void verifyCanGetByKey(String[] strArr, Set<KeyValue<String, Long>> set, Set<KeyValue<String, Long>> set2, ReadOnlyWindowStore<String, Long> readOnlyWindowStore, ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore) throws InterruptedException {
        TreeSet treeSet = new TreeSet(this.stringLongComparator);
        TreeSet treeSet2 = new TreeSet(this.stringLongComparator);
        long currentTimeMillis = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT;
        while (true) {
            if ((treeSet.size() < strArr.length || treeSet2.size() < strArr.length) && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
                for (String str : strArr) {
                    treeSet.addAll(fetch(readOnlyWindowStore, str));
                    Long l = (Long) readOnlyKeyValueStore.get(str);
                    if (l != null) {
                        treeSet2.add(new KeyValue(str, l));
                    }
                }
            }
        }
        MatcherAssert.assertThat(treeSet, IsEqual.equalTo(set));
        MatcherAssert.assertThat(treeSet2, IsEqual.equalTo(set2));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void verifyGreaterOrEqual(String[] strArr, Map<String, Long> map, Map<String, Long> map2, ReadOnlyWindowStore<String, Long> readOnlyWindowStore, ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore, boolean z) throws InterruptedException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : strArr) {
            Map<String, Long> fetchMap = fetchMap(readOnlyWindowStore, str);
            if (fetchMap.equals(Collections.emptyMap()) && z) {
                Assert.fail("Key not found " + str);
            }
            hashMap.putAll(fetchMap);
            Long l = (Long) readOnlyKeyValueStore.get(str);
            if (l != null) {
                hashMap2.put(str, l);
            } else if (z) {
                Assert.fail("Key not found " + str);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            if (map.containsKey(entry.getKey())) {
                Assert.assertTrue(((Long) entry.getValue()).longValue() >= ((Long) map.get(entry.getKey())).longValue());
            }
            map.put(entry.getKey(), entry.getValue());
        }
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            if (map2.containsKey(entry2.getKey())) {
                Assert.assertTrue(((Long) entry2.getValue()).longValue() >= ((Long) map2.get(entry2.getKey())).longValue());
            }
            map2.put(entry2.getKey(), entry2.getValue());
        }
    }

    private void waitUntilAtLeastNumRecordProcessed(String str, int i) throws InterruptedException {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "queryable-state-consumer");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", LongDeserializer.class.getName());
        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(properties, str, i, 60000L);
    }

    private Set<KeyValue<String, Long>> fetch(ReadOnlyWindowStore<String, Long> readOnlyWindowStore, String str) {
        WindowStoreIterator fetch = readOnlyWindowStore.fetch(str, 0L, System.currentTimeMillis());
        return fetch.hasNext() ? Collections.singleton(KeyValue.pair(str, ((KeyValue) fetch.next()).value)) : Collections.emptySet();
    }

    private Map<String, Long> fetchMap(ReadOnlyWindowStore<String, Long> readOnlyWindowStore, String str) {
        WindowStoreIterator fetch = readOnlyWindowStore.fetch(str, 0L, System.currentTimeMillis());
        return fetch.hasNext() ? Collections.singletonMap(str, ((KeyValue) fetch.next()).value) : Collections.emptyMap();
    }
}
