package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:2")})
/* loaded from: input_file:org/apache/kafka/streams/integration/PauseResumeIntegrationTest.class */
public class PauseResumeIntegrationTest {
    private static Properties producerConfig;
    private static Properties consumerConfig;
    private static final String INPUT_STREAM_1 = "input-stream-1";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private static final String OUTPUT_STREAM_1 = "output-stream-1";
    private static final String OUTPUT_STREAM_2 = "output-stream-2";
    private static final String TOPOLOGY1 = "topology1";
    private static final String TOPOLOGY2 = "topology2";
    private String appId;
    private KafkaStreams kafkaStreams;
    private KafkaStreams kafkaStreams2;
    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = Materialized.as(Stores.inMemoryKeyValueStore("store"));
    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = Arrays.asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L), KeyValue.pair("C", -50L));
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = Arrays.asList(KeyValue.pair("A", 1L), KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L), KeyValue.pair("C", 2L));
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 2L), KeyValue.pair("A", 4L), KeyValue.pair("C", 3L), KeyValue.pair("C", 4L));
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA_ALL = new ArrayList<KeyValue<String, Long>>() { // from class: org.apache.kafka.streams.integration.PauseResumeIntegrationTest.1
        {
            addAll(PauseResumeIntegrationTest.COUNT_OUTPUT_DATA);
            addAll(PauseResumeIntegrationTest.COUNT_OUTPUT_DATA2);
        }
    };

    private static Stream<Boolean> parameters() {
        return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE});
    }

    @BeforeAll
    public static void startCluster() throws Exception {
        CLUSTER.start();
        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class);
        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void createTopics(TestInfo testInfo) throws InterruptedException {
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2);
        this.appId = IntegrationTestUtils.safeUniqueTestName(testInfo);
    }

    private Properties props(boolean z) {
        Properties properties = new Properties();
        properties.put("application.id", this.appId);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory(this.appId).getPath());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Long().getClass());
        properties.put("commit.interval.ms", 1000L);
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("auto.offset.reset", "earliest");
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", 1000);
        properties.put("__state.updater.enabled__", Boolean.valueOf(z));
        return properties;
    }

    @AfterEach
    public void shutdown() throws InterruptedException {
        for (KafkaStreams kafkaStreams : Arrays.asList(this.kafkaStreams, this.kafkaStreams2, this.streamsNamedTopologyWrapper)) {
            if (kafkaStreams != null) {
                kafkaStreams.close(Duration.ofSeconds(30L));
            }
        }
    }

    private static void produceToInputTopics(String str, Collection<KeyValue<String, Long>> collection) {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, collection, producerConfig, CLUSTER.time);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldPauseAndResumeKafkaStreams(boolean z) throws Exception {
        this.kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, z);
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.kafkaStreams.pause();
        Assert.assertTrue(this.kafkaStreams.isPaused());
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        IntegrationTestUtils.waitUntilStreamsHasPolled(this.kafkaStreams, 2);
        assertTopicSize(OUTPUT_STREAM_1, 5);
        this.kafkaStreams.resume();
        Assert.assertFalse(this.kafkaStreams.isPaused());
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
        assertTopicSize(OUTPUT_STREAM_1, 10);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldAllowForTopologiesToStartPaused(boolean z) throws Exception {
        this.kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, z);
        this.kafkaStreams.pause();
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertTrue(this.kafkaStreams.isPaused());
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        IntegrationTestUtils.waitUntilStreamsHasPolled(this.kafkaStreams, 2);
        assertTopicSize(OUTPUT_STREAM_1, 0);
        this.kafkaStreams.resume();
        Assert.assertFalse(this.kafkaStreams.isPaused());
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        assertTopicSize(OUTPUT_STREAM_1, 5);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(boolean z) throws Exception {
        this.streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(z));
        this.streamsNamedTopologyWrapper.start(Arrays.asList(getNamedTopologyBuilder1().build(), getNamedTopologyBuilder2().build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streamsNamedTopologyWrapper), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
        assertTopicSize(OUTPUT_STREAM_1, 5);
        assertTopicSize(OUTPUT_STREAM_2, 5);
        this.streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
        Assert.assertTrue(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isPaused());
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA2);
        assertTopicSize(OUTPUT_STREAM_1, 5);
        assertTopicSize(OUTPUT_STREAM_2, 10);
        this.streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(boolean z) throws Exception {
        this.streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(z));
        this.streamsNamedTopologyWrapper.start(Arrays.asList(getNamedTopologyBuilder1().build(), getNamedTopologyBuilder2().build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streamsNamedTopologyWrapper), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
        this.streamsNamedTopologyWrapper.pause();
        Assert.assertTrue(this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertTrue(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertTrue(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        IntegrationTestUtils.waitUntilStreamsHasPolled(this.streamsNamedTopologyWrapper, 2);
        assertTopicSize(OUTPUT_STREAM_1, 5);
        assertTopicSize(OUTPUT_STREAM_2, 5);
        this.streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertTrue(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
        assertTopicSize(OUTPUT_STREAM_1, 10);
        assertTopicSize(OUTPUT_STREAM_2, 5);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void shouldAllowForNamedTopologiesToStartPaused(boolean z) throws Exception {
        this.streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(z));
        NamedTopologyBuilder namedTopologyBuilder1 = getNamedTopologyBuilder1();
        NamedTopologyBuilder namedTopologyBuilder2 = getNamedTopologyBuilder2();
        this.streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
        this.streamsNamedTopologyWrapper.start(Arrays.asList(namedTopologyBuilder1.build(), namedTopologyBuilder2.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streamsNamedTopologyWrapper), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertTrue(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        assertTopicSize(OUTPUT_STREAM_1, 0);
        assertTopicSize(OUTPUT_STREAM_2, 0);
        this.streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isPaused());
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
        Assert.assertFalse(this.streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void pauseResumeShouldWorkAcrossInstances(boolean z) throws Exception {
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, z);
        this.kafkaStreams.pause();
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertTrue(this.kafkaStreams.isPaused());
        this.kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, z);
        this.kafkaStreams2.pause();
        this.kafkaStreams2.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams2), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        Assert.assertTrue(this.kafkaStreams2.isPaused());
        assertTopicSize(OUTPUT_STREAM_1, 0);
        this.kafkaStreams2.close();
        this.kafkaStreams2.cleanUp();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams2), KafkaStreams.State.NOT_RUNNING, STARTUP_TIMEOUT);
        this.kafkaStreams.resume();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
    }

    @MethodSource({"parameters"})
    @ParameterizedTest
    public void pausedTopologyShouldNotRestoreStateStores(boolean z) throws Exception {
        Properties props = props(z);
        props.put("num.standby.replicas", 1);
        Properties props2 = props(z);
        props2.put("num.standby.replicas", 1);
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, props);
        this.kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1, props2);
        this.kafkaStreams.start();
        this.kafkaStreams2.start();
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.kafkaStreams, this.kafkaStreams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
        this.kafkaStreams.close();
        this.kafkaStreams2.close();
        this.kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, props);
        this.kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1, props2);
        this.kafkaStreams.cleanUp();
        this.kafkaStreams2.cleanUp();
        this.kafkaStreams.pause();
        this.kafkaStreams2.pause();
        this.kafkaStreams.start();
        this.kafkaStreams2.start();
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.kafkaStreams, this.kafkaStreams2), KafkaStreams.State.REBALANCING, STARTUP_TIMEOUT);
        assertStreamsLocalStoreLagStaysConstant(this.kafkaStreams);
        assertStreamsLocalStoreLagStaysConstant(this.kafkaStreams2);
    }

    private void assertStreamsLocalStoreLagStaysConstant(KafkaStreams kafkaStreams) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return kafkaStreams.allLocalStorePartitionLags().containsKey("test-store");
        }, "Lags for test-store partitions were not found within the timeout!");
        IntegrationTestUtils.waitUntilStreamsHasPolled(kafkaStreams, 2);
        long offsetLag = ((LagInfo) ((Map) kafkaStreams.allLocalStorePartitionLags().get("test-store")).get(0)).offsetLag();
        IntegrationTestUtils.waitUntilStreamsHasPolled(kafkaStreams, 2);
        long offsetLag2 = ((LagInfo) ((Map) kafkaStreams.allLocalStorePartitionLags().get("test-store")).get(0)).offsetLag();
        Assert.assertTrue(offsetLag > 0);
        Assert.assertEquals(offsetLag, offsetLag2);
    }

    private KafkaStreams buildKafkaStreams(String str, boolean z) {
        return buildKafkaStreams(str, props(z));
    }

    private KafkaStreams buildKafkaStreams(String str, Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(INPUT_STREAM_1).groupByKey().count(Materialized.as("test-store")).toStream().to(str);
        return new KafkaStreams(streamsBuilder.build(properties), properties);
    }

    private void assertTopicSize(String str, int i) {
        Assert.assertEquals(IntegrationTestUtils.getTopicSize(consumerConfig, str), i);
    }

    private void awaitOutput(String str, int i, List<KeyValue<String, Long>> list) throws Exception {
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, str, i), CoreMatchers.equalTo(list));
    }

    private NamedTopologyBuilder getNamedTopologyBuilder1() {
        NamedTopologyBuilder newNamedTopologyBuilder = this.streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY1);
        newNamedTopologyBuilder.stream(INPUT_STREAM_1).groupByKey().count().toStream().to(OUTPUT_STREAM_1);
        return newNamedTopologyBuilder;
    }

    private NamedTopologyBuilder getNamedTopologyBuilder2() {
        NamedTopologyBuilder newNamedTopologyBuilder = this.streamsNamedTopologyWrapper.newNamedTopologyBuilder(TOPOLOGY2);
        newNamedTopologyBuilder.stream(INPUT_STREAM_2).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        return newNamedTopologyBuilder;
    }
}
