package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/StoreQueryIntegrationTest.class */
public class StoreQueryIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String TABLE_NAME = "source-table";

    @Rule
    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1);

    @Rule
    public TestName testName = new TestName();
    private final List<KafkaStreams> streamsToCleanup = new ArrayList();
    private final MockTime mockTime = this.cluster.time;
    private static final Logger LOG = LoggerFactory.getLogger(StoreQueryIntegrationTest.class);
    private static int port = 0;

    @Before
    public void before() throws InterruptedException {
        this.cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
    }

    @After
    public void after() {
        Iterator<KafkaStreams> it = this.streamsToCleanup.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Test
    public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(TABLE_NAME).withCachingDisabled()).toStream().peek((num, num2) -> {
            semaphore.release();
        });
        KafkaStreams createKafkaStreams = createKafkaStreams(streamsBuilder, streamsConfiguration());
        KafkaStreams createKafkaStreams2 = createKafkaStreams(streamsBuilder, streamsConfiguration());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(createKafkaStreams, createKafkaStreams2), Duration.ofSeconds(60L));
        produceValueRange(1, 0, 100);
        MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
        until(() -> {
            KeyQueryMetadata queryMetadataForKey = createKafkaStreams.queryMetadataForKey(TABLE_NAME, 1, (str, num3, obj, i) -> {
                return 0;
            });
            QueryableStoreType keyValueStore = QueryableStoreTypes.keyValueStore();
            ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(TABLE_NAME, createKafkaStreams, keyValueStore);
            ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(TABLE_NAME, createKafkaStreams2, keyValueStore);
            boolean z = queryMetadataForKey.activeHost().port() % 2 == 1;
            MatcherAssert.assertThat(z ? (Integer) readOnlyKeyValueStore.get(1) : (Integer) readOnlyKeyValueStore2.get(1), Matchers.is(Matchers.notNullValue()));
            try {
                if (z) {
                    MatcherAssert.assertThat(readOnlyKeyValueStore2.get(1), Matchers.is(Matchers.nullValue()));
                    return true;
                }
                MatcherAssert.assertThat(readOnlyKeyValueStore.get(1), Matchers.is(Matchers.nullValue()));
                return true;
            } catch (InvalidStateStoreException e) {
                MatcherAssert.assertThat(e.getMessage(), Matchers.containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"));
                LOG.info("Streams wasn't running. Will try again.");
                return false;
            }
        });
    }

    @Test
    public void shouldQuerySpecificActivePartitionStores() throws Exception {
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(TABLE_NAME).withCachingDisabled()).toStream().peek((num, num2) -> {
            semaphore.release();
        });
        KafkaStreams createKafkaStreams = createKafkaStreams(streamsBuilder, streamsConfiguration());
        KafkaStreams createKafkaStreams2 = createKafkaStreams(streamsBuilder, streamsConfiguration());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(createKafkaStreams, createKafkaStreams2), Duration.ofSeconds(60L));
        produceValueRange(1, 0, 100);
        MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
        until(() -> {
            KeyQueryMetadata queryMetadataForKey = createKafkaStreams.queryMetadataForKey(TABLE_NAME, 1, (str, num3, obj, i) -> {
                return 0;
            });
            int partition = queryMetadataForKey.partition();
            int i2 = partition == 0 ? 1 : 0;
            boolean z = queryMetadataForKey.activeHost().port() % 2 == 1;
            StoreQueryParameters withPartition = StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(partition));
            ReadOnlyKeyValueStore readOnlyKeyValueStore = null;
            ReadOnlyKeyValueStore readOnlyKeyValueStore2 = null;
            if (z) {
                readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams, withPartition);
            } else {
                readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams2, withPartition);
            }
            if (z) {
                MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.is(Matchers.notNullValue()));
                MatcherAssert.assertThat(readOnlyKeyValueStore2, Matchers.is(Matchers.nullValue()));
            } else {
                MatcherAssert.assertThat(readOnlyKeyValueStore2, Matchers.is(Matchers.notNullValue()));
                MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.is(Matchers.nullValue()));
            }
            MatcherAssert.assertThat(z ? (Integer) readOnlyKeyValueStore.get(1) : (Integer) readOnlyKeyValueStore2.get(1), Matchers.is(Matchers.notNullValue()));
            StoreQueryParameters withPartition2 = StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(i2));
            try {
                if (z) {
                    MatcherAssert.assertThat(((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams2, withPartition2)).get(1), Matchers.is(Matchers.nullValue()));
                    MatcherAssert.assertThat(Assert.assertThrows(InvalidStateStoreException.class, () -> {
                    }).getMessage(), Matchers.containsString("The specified partition 1 for store source-table does not exist."));
                    return true;
                }
                MatcherAssert.assertThat(((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams, withPartition2)).get(1), Matchers.is(Matchers.nullValue()));
                MatcherAssert.assertThat(Assert.assertThrows(InvalidStateStoreException.class, () -> {
                }).getMessage(), Matchers.containsString("The specified partition 1 for store source-table does not exist."));
                return true;
            } catch (InvalidStateStoreException e) {
                MatcherAssert.assertThat(e.getMessage(), Matchers.containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"));
                LOG.info("Streams wasn't running. Will try again.");
                return false;
            }
        });
    }

    @Test
    public void shouldQueryAllStalePartitionStores() throws Exception {
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(TABLE_NAME).withCachingDisabled()).toStream().peek((num, num2) -> {
            semaphore.release();
        });
        KafkaStreams createKafkaStreams = createKafkaStreams(streamsBuilder, streamsConfiguration());
        KafkaStreams createKafkaStreams2 = createKafkaStreams(streamsBuilder, streamsConfiguration());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(createKafkaStreams, createKafkaStreams2), Duration.ofSeconds(60L));
        produceValueRange(1, 0, 100);
        MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
        QueryableStoreType keyValueStore = QueryableStoreTypes.keyValueStore();
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(TABLE_NAME, createKafkaStreams, true, keyValueStore)).get(1) != null;
        }, "store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(TABLE_NAME, createKafkaStreams2, true, keyValueStore)).get(1) != null;
        }, "store2 cannot find results for key");
    }

    @Test
    public void shouldQuerySpecificStalePartitionStores() throws Exception {
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(TABLE_NAME).withCachingDisabled()).toStream().peek((num, num2) -> {
            semaphore.release();
        });
        KafkaStreams createKafkaStreams = createKafkaStreams(streamsBuilder, streamsConfiguration());
        KafkaStreams createKafkaStreams2 = createKafkaStreams(streamsBuilder, streamsConfiguration());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(createKafkaStreams, createKafkaStreams2), Duration.ofSeconds(60L));
        produceValueRange(1, 0, 100);
        MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
        int partition = createKafkaStreams.queryMetadataForKey(TABLE_NAME, 1, (str, num3, obj, i) -> {
            return 0;
        }).partition();
        int i2 = partition == 0 ? 1 : 0;
        QueryableStoreType keyValueStore = QueryableStoreTypes.keyValueStore();
        StoreQueryParameters withPartition = StoreQueryParameters.fromNameAndType(TABLE_NAME, keyValueStore).enableStaleStores().withPartition(Integer.valueOf(partition));
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams, withPartition)).get(1) != null;
        }, "store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams2, withPartition)).get(1) != null;
        }, "store2 cannot find results for key");
        StoreQueryParameters withPartition2 = StoreQueryParameters.fromNameAndType(TABLE_NAME, keyValueStore).enableStaleStores().withPartition(Integer.valueOf(i2));
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams, withPartition2);
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams2, withPartition2);
        MatcherAssert.assertThat(readOnlyKeyValueStore.get(1), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(readOnlyKeyValueStore2.get(1), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws Exception {
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(TABLE_NAME).withCachingDisabled()).toStream().peek((num, num2) -> {
            semaphore.release();
        });
        Properties streamsConfiguration = streamsConfiguration();
        streamsConfiguration.put("num.stream.threads", 2);
        Properties streamsConfiguration2 = streamsConfiguration();
        streamsConfiguration2.put("num.stream.threads", 2);
        KafkaStreams createKafkaStreams = createKafkaStreams(streamsBuilder, streamsConfiguration);
        KafkaStreams createKafkaStreams2 = createKafkaStreams(streamsBuilder, streamsConfiguration2);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(createKafkaStreams, createKafkaStreams2), Duration.ofSeconds(60L));
        Assert.assertTrue(createKafkaStreams.localThreadsMetadata().size() > 1);
        Assert.assertTrue(createKafkaStreams2.localThreadsMetadata().size() > 1);
        produceValueRange(1, 0, 100);
        MatcherAssert.assertThat(Boolean.valueOf(semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS)), Matchers.is(Matchers.equalTo(true)));
        int partition = createKafkaStreams.queryMetadataForKey(TABLE_NAME, 1, new IntegerSerializer()).partition();
        int i = partition == 0 ? 1 : 0;
        QueryableStoreType keyValueStore = QueryableStoreTypes.keyValueStore();
        StoreQueryParameters withPartition = StoreQueryParameters.fromNameAndType(TABLE_NAME, keyValueStore).enableStaleStores().withPartition(Integer.valueOf(partition));
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams, withPartition)).get(1) != null;
        }, "store1 cannot find results for key");
        TestUtils.waitForCondition(() -> {
            return ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams2, withPartition)).get(1) != null;
        }, "store2 cannot find results for key");
        StoreQueryParameters withPartition2 = StoreQueryParameters.fromNameAndType(TABLE_NAME, keyValueStore).enableStaleStores().withPartition(Integer.valueOf(i));
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams, withPartition2);
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(createKafkaStreams2, withPartition2);
        MatcherAssert.assertThat(readOnlyKeyValueStore.get(1), Matchers.is(Matchers.nullValue()));
        MatcherAssert.assertThat(readOnlyKeyValueStore2.get(1), Matchers.is(Matchers.nullValue()));
    }

    private static void until(TestCondition testCondition) {
        boolean z = false;
        long currentTimeMillis = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT;
        while (!z && System.currentTimeMillis() < currentTimeMillis) {
            try {
                z = testCondition.conditionMet();
                Thread.sleep(500L);
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    private KafkaStreams createKafkaStreams(StreamsBuilder streamsBuilder, Properties properties) {
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(properties), properties);
        this.streamsToCleanup.add(kafkaStreams);
        return kafkaStreams;
    }

    private void produceValueRange(int i, int i2, int i3) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.cluster.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC_NAME, (Collection) IntStream.range(i2, i3).mapToObj(i4 -> {
            return KeyValue.pair(Integer.valueOf(i), Integer.valueOf(i4));
        }).collect(Collectors.toList()), properties, this.mockTime);
    }

    private Properties streamsConfiguration() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        properties.put("application.id", "app-" + safeUniqueTestName);
        StringBuilder append = new StringBuilder().append("localhost:");
        int i = port + 1;
        port = i;
        properties.put("application.server", append.append(i).toString());
        properties.put("bootstrap.servers", this.cluster.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("num.standby.replicas", 1);
        properties.put("max.poll.records", 100);
        properties.put("heartbeat.interval.ms", 200);
        properties.put("session.timeout.ms", 1000);
        properties.put("commit.interval.ms", 100);
        return properties;
    }
}
