package org.apache.kafka.streams;

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

    @Test
    public void testStartAndClose() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testStartAndClose");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        int i = MockMetricsReporter.INIT_COUNT.get();
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.start();
        Assert.assertTrue("some reporters should be initialized by calling start()", MockMetricsReporter.INIT_COUNT.get() - i > 0);
        kafkaStreams.close();
        Assert.assertEquals("each reporter initialized should also be closed", i2 + r0, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCloseIsIdempotent() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testCloseIsIdempotent");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.close();
        int i = MockMetricsReporter.CLOSE_COUNT.get();
        kafkaStreams.close();
        Assert.assertEquals("subsequent close() calls should do nothing", i, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotStartOnceClosed() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testCannotStartOnceClosed");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.close();
        try {
            try {
                kafkaStreams.start();
                kafkaStreams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot restart after closing.", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotStartTwice() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testCannotStartTwice");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.start();
        try {
            try {
                kafkaStreams.start();
                kafkaStreams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("This process was already started.", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
        createKafkaStreams().allMetadata();
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
        createKafkaStreams().allMetadataForStore("store");
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
        createKafkaStreams().metadataForKey("store", "key", Serdes.String().serializer());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
        createKafkaStreams().metadataForKey("store", "key", new StreamPartitioner<String, Object>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
            public Integer partition(String str, Object obj, int i) {
                return 0;
            }
        });
    }

    private KafkaStreams createKafkaStreams() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "appId");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        return new KafkaStreams(new KStreamBuilder(), properties);
    }

    @Test
    public void testCleanup() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testLocalCleanup");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        kafkaStreams.close();
        kafkaStreams.cleanUp();
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotCleanupWhileRunning() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testCannotCleanupWhileRunning");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.start();
        try {
            try {
                kafkaStreams.cleanUp();
                kafkaStreams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot clean up while running.", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }
}
