package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.StreamsUpgradeTest;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.class */
public class StreamsUpgradeTestIntegrationTest {

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @BeforeClass
    public static void setup() {
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 1, "data");
    }

    @Test
    public void testVersionProbingUpgrade() throws InterruptedException {
        KafkaStreams buildStreams = StreamsUpgradeTest.buildStreams(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})));
        KafkaStreams buildStreams2 = StreamsUpgradeTest.buildStreams(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})));
        KafkaStreams buildStreams3 = StreamsUpgradeTest.buildStreams(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})));
        startSync(buildStreams, buildStreams2, buildStreams3);
        buildStreams.close();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaStreams buildFutureStreams = buildFutureStreams(atomicInteger);
        startSync(buildFutureStreams);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(7));
        buildStreams2.close();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaStreams buildFutureStreams2 = buildFutureStreams(atomicInteger2);
        startSync(buildFutureStreams2);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger2.get()), CoreMatchers.is(7));
        buildStreams3.close();
        AtomicInteger atomicInteger3 = new AtomicInteger();
        KafkaStreams buildFutureStreams3 = buildFutureStreams(atomicInteger3);
        startSync(buildFutureStreams3);
        TestUtils.retryOnExceptionWithTimeout(() -> {
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger3.get()), CoreMatchers.is(8));
        });
        TestUtils.retryOnExceptionWithTimeout(() -> {
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger2.get()), CoreMatchers.is(8));
        });
        TestUtils.retryOnExceptionWithTimeout(() -> {
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(8));
        });
        buildFutureStreams.close(Duration.ZERO);
        buildFutureStreams2.close(Duration.ZERO);
        buildFutureStreams3.close(Duration.ZERO);
        buildFutureStreams.close();
        buildFutureStreams2.close();
        buildFutureStreams3.close();
    }

    private static KafkaStreams buildFutureStreams(AtomicInteger atomicInteger) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("test.future.metadata", atomicInteger);
        return StreamsUpgradeTest.buildStreams(properties);
    }

    private static void startSync(KafkaStreams... kafkaStreamsArr) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(kafkaStreamsArr.length);
        for (KafkaStreams kafkaStreams : kafkaStreamsArr) {
            kafkaStreams.setStateListener((state, state2) -> {
                if (state == KafkaStreams.State.RUNNING) {
                    countDownLatch.countDown();
                }
            });
        }
        for (KafkaStreams kafkaStreams2 : kafkaStreamsArr) {
            kafkaStreams2.start();
        }
        countDownLatch.await();
    }
}
