/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_THREADS = 2;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final StreamsBuilder builder = new StreamsBuilder();
    private KafkaStreams streams;
    private Properties props;

    @Before
    public void before() {
        this.props = new Properties();
        this.props.setProperty("application.id", "appId");
        this.props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        this.props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", (Object)2);
        this.streams = new KafkaStreams(this.builder.build(), this.props);
    }

    @Test
    public void testStateChanges() throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        final KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        StateListenerStub stateListener = new StateListenerStub();
        streams.setStateListener((KafkaStreams.StateListener)stateListener);
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
        Assert.assertEquals((long)stateListener.numChanges, (long)0L);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        streams.close();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testStateCloseAfterCreate() {
        StreamsBuilder builder = new StreamsBuilder();
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        StateListenerStub stateListener = new StateListenerStub();
        streams.setStateListener((KafkaStreams.StateListener)stateListener);
        streams.close();
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        List<Node> nodes = Arrays.asList(new Node(0, "localhost", 8121));
        Cluster cluster = new Cluster("mockClusterId", nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
        MockClientSupplier clientSupplier = new MockClientSupplier();
        clientSupplier.setClusterForAdminClient(cluster);
        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig((Map)this.props), (KafkaClientSupplier)clientSupplier);
        streams.close();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.NOT_RUNNING;
            }
        }, (long)10000L, (String)"Streams never stopped.");
        Assert.assertTrue((boolean)clientSupplier.consumer.closed());
        Assert.assertTrue((boolean)clientSupplier.restoreConsumer.closed());
        for (MockProducer p : clientSupplier.producers) {
            Assert.assertTrue((boolean)p.closed());
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        final KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        Field threadsField = streams.getClass().getDeclaredField("threads");
        threadsField.setAccessible(true);
        StreamThread[] threads = (StreamThread[])threadsField.get(streams);
        Assert.assertEquals((long)2L, (long)threads.length);
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        for (int i = 0; i < 2; ++i) {
            final StreamThread tmpThread = threads[i];
            tmpThread.shutdown();
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return tmpThread.state() == StreamThread.State.DEAD;
                }
            }, (long)10000L, (String)"Thread never stopped.");
            threads[i].join();
        }
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.ERROR;
            }
        }, (long)10000L, (String)"Streams never stopped.");
        streams.close();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.NOT_RUNNING;
            }
        }, (long)10000L, (String)"Streams never stopped.");
        Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
        globalThreadField.setAccessible(true);
        GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
        Assert.assertEquals((Object)globalStreamThread, null);
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        final KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
        globalThreadField.setAccessible(true);
        final GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
        globalStreamThread.shutdown();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }
        }, (long)10000L, (String)"Thread never stopped.");
        globalStreamThread.join();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.ERROR);
        streams.close();
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        StreamsBuilder builder = new StreamsBuilder();
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        int newInitCount = MockMetricsReporter.INIT_COUNT.get();
        int initDiff = newInitCount - oldInitCount;
        Assert.assertTrue((String)"some reporters should be initialized by calling on construction", (initDiff > 0 ? 1 : 0) != 0);
        streams.start();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        streams.close();
        Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCloseIsIdempotent() {
        this.streams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        this.streams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCannotStartOnceClosed() {
        this.streams.start();
        this.streams.close();
        try {
            this.streams.start();
            Assert.fail((String)"Should have throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.streams.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCannotStartTwice() {
        this.streams.start();
        try {
            this.streams.start();
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.streams.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        this.streams.start();
        try {
            this.streams.setGlobalStateRestoreListener((StateRestoreListener)new MockStateRestoreListener());
            Assert.fail((String)"Should throw an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.streams.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        this.streams.start();
        try {
            this.streams.setUncaughtExceptionHandler(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.streams.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        this.streams.start();
        try {
            this.streams.setStateListener(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.streams.close();
        }
    }

    @Test
    public void testNumberDefaultMetrics() {
        this.props.put("num.stream.threads", "1");
        StreamsBuilder builder = new StreamsBuilder();
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        Map metrics = streams.metrics();
        Assert.assertEquals((long)23L, (long)metrics.size());
    }

    @Test
    public void testIllegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", "illegalConfig");
        StreamsBuilder builder = new StreamsBuilder();
        try {
            new KafkaStreams(builder.build(), this.props);
            Assert.fail((String)"Should have throw ConfigException");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void testLegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        StreamsBuilder builder1 = new StreamsBuilder();
        KafkaStreams streams1 = new KafkaStreams(builder1.build(), this.props);
        streams1.close();
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        StreamsBuilder builder2 = new StreamsBuilder();
        new KafkaStreams(builder2.build(), this.props);
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        this.streams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        this.streams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
        this.streams.metadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
        this.streams.metadataForKey("store", (Object)"key", (StreamPartitioner)new StreamPartitioner<String, Object>(){

            public Integer partition(String key, Object value, int numPartitions) {
                return 0;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        try {
            StreamsBuilder builder = new StreamsBuilder();
            final CountDownLatch latch = new CountDownLatch(1);
            String topic = "input";
            CLUSTER.createTopic("input");
            builder.stream("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).foreach((ForeachAction)new ForeachAction<String, String>(){

                public void apply(String key, String value) {
                    try {
                        latch.countDown();
                        while (keepRunning.get()) {
                            Thread.sleep(10L);
                        }
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            });
            KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
            streams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue((Object)"A", (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), System.currentTimeMillis());
            Assert.assertTrue((String)"Timed out waiting to receive single message", (boolean)latch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse((boolean)streams.close(10L, TimeUnit.MILLISECONDS));
        }
        finally {
            keepRunning.set(false);
        }
    }

    @Test
    public void shouldReturnThreadMetadata() {
        this.streams.start();
        Set threadMetadata = this.streams.localThreadsMetadata();
        Assert.assertNotNull((Object)threadMetadata);
        Assert.assertEquals((long)2L, (long)threadMetadata.size());
        for (ThreadMetadata metadata : threadMetadata) {
            Assert.assertTrue((String)("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED"), (boolean)Utils.mkList((Object[])new String[]{"RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED"}).contains(metadata.threadState()));
            Assert.assertEquals((long)0L, (long)metadata.standbyTasks().size());
            Assert.assertEquals((long)0L, (long)metadata.activeTasks().size());
        }
        this.streams.close();
    }

    @Test
    public void testCleanup() {
        StreamsBuilder builder = new StreamsBuilder();
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        streams.cleanUp();
        streams.start();
        streams.close();
        streams.cleanUp();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCannotCleanupWhileRunning() throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        final KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return streams.state() == KafkaStreams.State.RUNNING;
            }
        }, (long)10000L, (String)"Streams never started.");
        try {
            streams.cleanUp();
            Assert.fail((String)"Should have thrown IllegalStateException");
        }
        catch (IllegalStateException expected) {
            Assert.assertEquals((Object)"Cannot clean up while running.", (Object)expected.getMessage());
        }
        finally {
            streams.close();
        }
    }

    @Test
    public void testToString() {
        this.streams.start();
        String streamString = this.streams.toString();
        this.streams.close();
        String appId = streamString.split("\\n")[1].split(":")[1].trim();
        Assert.assertNotEquals((String)"streamString should not be empty", (Object)"", (Object)streamString);
        Assert.assertNotNull((String)"streamString should not be null", (Object)streamString);
        Assert.assertNotEquals((String)"streamString contains non-empty appId", (Object)"", (Object)appId);
        Assert.assertNotNull((String)"streamString contains non-null appId", (Object)appId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCleanupOldStateDirs() throws InterruptedException {
        this.props.setProperty("state.cleanup.delay.ms", "1");
        String topic = "topic";
        CLUSTER.createTopic("topic");
        StreamsBuilder builder = new StreamsBuilder();
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        builder.table("topic", consumed);
        KafkaStreams streams = new KafkaStreams(builder.build(), this.props);
        final CountDownLatch latch = new CountDownLatch(1);
        streams.setStateListener(new KafkaStreams.StateListener(){

            public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    latch.countDown();
                }
            }
        });
        String appDir = this.props.getProperty("state.dir") + File.separator + this.props.getProperty("application.id");
        File oldTaskDir = new File(appDir, "10_1");
        Assert.assertTrue((boolean)oldTaskDir.mkdirs());
        try {
            streams.start();
            latch.await(30L, TimeUnit.SECONDS);
            this.verifyCleanupStateDir(appDir, oldTaskDir);
            Assert.assertTrue((boolean)oldTaskDir.mkdirs());
            this.verifyCleanupStateDir(appDir, oldTaskDir);
        }
        finally {
            streams.close();
        }
    }

    private void verifyCleanupStateDir(String appDir, final File oldTaskDir) throws InterruptedException {
        final File taskDir = new File(appDir, "0_0");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return !oldTaskDir.exists() && taskDir.exists();
            }
        }, (long)30000L, (String)"cleanup has not successfully run");
        Assert.assertTrue((boolean)taskDir.exists());
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        int numChanges = 0;
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
        }
    }
}

