/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @Test
    public void testStartAndClose() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testStartAndClose");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
        streams.start();
        int newInitCount = MockMetricsReporter.INIT_COUNT.get();
        int initCountDifference = newInitCount - oldInitCount;
        Assert.assertTrue((String)"some reporters should be initialized by calling start()", (initCountDifference > 0 ? 1 : 0) != 0);
        streams.close();
        Assert.assertEquals((String)"each reporter initialized should also be closed", (long)(oldCloseCount + initCountDifference), (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCloseIsIdempotent() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testCloseIsIdempotent");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
        streams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        streams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotStartOnceClosed() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testCannotStartOnceClosed");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        streams.close();
        try (KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);){
            streams.start();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotStartTwice() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testCannotStartTwice");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        streams.start();
        try (KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);){
            streams.start();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
        KafkaStreams streams = this.createKafkaStreams();
        streams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
        KafkaStreams streams = this.createKafkaStreams();
        streams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
        KafkaStreams streams = this.createKafkaStreams();
        streams.metadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
        KafkaStreams streams = this.createKafkaStreams();
        streams.metadataForKey("store", (Object)"key", (StreamPartitioner)new StreamPartitioner<String, Object>(){

            public Integer partition(String key, Object value, int numPartitions) {
                return 0;
            }
        });
    }

    private KafkaStreams createKafkaStreams() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        return new KafkaStreams((TopologyBuilder)builder, props);
    }

    @Test
    public void testCleanup() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testLocalCleanup");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);
        streams.cleanUp();
        streams.start();
        streams.close();
        streams.cleanUp();
    }

    @Test(expected=IllegalStateException.class)
    public void testCannotCleanupWhileRunning() throws Exception {
        Properties props = new Properties();
        props.setProperty("application.id", "testCannotCleanupWhileRunning");
        props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KStreamBuilder builder = new KStreamBuilder();
        streams.start();
        try (KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, props);){
            streams.cleanUp();
        }
    }
}

