/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_THREADS = 2;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final StreamsBuilder builder = new StreamsBuilder();
    private KafkaStreams globalStreams;
    private Properties props;

    @Before
    public void before() {
        this.props = new Properties();
        this.props.put("application.id", "appId");
        this.props.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", (Object)2);
        this.props.put("internal.leave.group.on.close", (Object)true);
        this.globalStreams = new KafkaStreams(this.builder.build(), this.props);
    }

    @After
    public void cleanup() {
        if (this.globalStreams != null) {
            this.globalStreams.close();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        this.props.put("send.buffer.bytes", (Object)-1);
        this.props.put("receive.buffer.bytes", (Object)-1);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        streams.close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketSendBufferSize() {
        this.props.put("send.buffer.bytes", (Object)-2);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        streams.close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() {
        this.props.put("receive.buffer.bytes", (Object)-2);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        streams.close();
    }

    @Test
    public void testStateChanges() throws InterruptedException {
        StateListenerStub stateListener = new StateListenerStub();
        this.globalStreams.setStateListener((KafkaStreams.StateListener)stateListener);
        Assert.assertEquals((Object)this.globalStreams.state(), (Object)KafkaStreams.State.CREATED);
        Assert.assertEquals((long)stateListener.numChanges, (long)0L);
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> this.globalStreams.state() == KafkaStreams.State.RUNNING, (long)10000L, (String)"Streams never started.");
        this.globalStreams.close();
        Assert.assertEquals((Object)this.globalStreams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testStateCloseAfterCreate() {
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            StateListenerStub stateListener = new StateListenerStub();
            streams.setStateListener((KafkaStreams.StateListener)stateListener);
        }
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        this.builder.globalTable("anyTopic");
        List<Node> nodes = Arrays.asList(new Node(0, "localhost", 8121));
        Cluster cluster = new Cluster("mockClusterId", nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
        MockClientSupplier clientSupplier = new MockClientSupplier();
        clientSupplier.setClusterForAdminClient(cluster);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props, (KafkaClientSupplier)clientSupplier);
        streams.close();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (long)10000L, (String)"Streams never stopped.");
        Assert.assertTrue((boolean)clientSupplier.consumer.closed());
        Assert.assertTrue((boolean)clientSupplier.restoreConsumer.closed());
        for (MockProducer p : clientSupplier.producers) {
            Assert.assertTrue((boolean)p.closed());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateThreadClose() throws Exception {
        this.builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            Field threadsField = streams.getClass().getDeclaredField("threads");
            threadsField.setAccessible(true);
            StreamThread[] threads = (StreamThread[])threadsField.get(streams);
            Assert.assertEquals((long)2L, (long)threads.length);
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (long)10000L, (String)"Streams never started.");
            for (int i = 0; i < 2; ++i) {
                StreamThread tmpThread = threads[i];
                tmpThread.shutdown();
                TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, (long)10000L, (String)"Thread never stopped.");
                threads[i].join();
            }
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.ERROR, (long)10000L, (String)"Streams never stopped.");
        }
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (long)10000L, (String)"Streams never stopped.");
        Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
        globalThreadField.setAccessible(true);
        GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
        Assert.assertNull((Object)globalStreamThread);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateGlobalThreadClose() throws Exception {
        this.builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (long)10000L, (String)"Streams never started.");
            Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
            globalThreadField.setAccessible(true);
            GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
            globalStreamThread.shutdown();
            TestUtils.waitForCondition(() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, (long)10000L, (String)"Thread never stopped.");
            globalStreamThread.join();
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.ERROR);
        }
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
        Properties props = new Properties();
        props.put("application.id", "appId");
        props.put("bootstrap.servers", "localhost:1");
        props.put("metric.reporters", MockMetricsReporter.class.getName());
        props.put("state.dir", TestUtils.tempDirectory().getPath());
        props.put("num.stream.threads", (Object)2);
        props.put("default.api.timeout.ms", (Object)200);
        this.builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), props);){
            streams.start();
            Assert.fail((String)"expected start() to time out and throw an exception.");
        }
    }

    @Test
    public void testLocalThreadCloseWithoutConnectingToBroker() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", "localhost:1");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        props.put("num.stream.threads", (Object)2);
        this.builder.table("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), props);){
            streams.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            int newInitCount = MockMetricsReporter.INIT_COUNT.get();
            int initDiff = newInitCount - oldInitCount;
            Assert.assertTrue((String)"some reporters should be initialized by calling on construction", (initDiff > 0 ? 1 : 0) != 0);
            streams.start();
            int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
            streams.close();
            Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        this.globalStreams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        this.globalStreams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCannotStartOnceClosed() {
        this.globalStreams.start();
        this.globalStreams.close();
        try {
            this.globalStreams.start();
            Assert.fail((String)"Should have throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void testCannotStartTwice() {
        this.globalStreams.start();
        try {
            this.globalStreams.start();
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        this.globalStreams.start();
        try {
            this.globalStreams.setGlobalStateRestoreListener((StateRestoreListener)new MockStateRestoreListener());
            Assert.fail((String)"Should throw an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        this.globalStreams.start();
        try {
            this.globalStreams.setUncaughtExceptionHandler(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        this.globalStreams.start();
        try {
            this.globalStreams.setStateListener(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testIllegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", "illegalConfig");
        try {
            new KafkaStreams(this.builder.build(), this.props);
            Assert.fail((String)"Should have throw ConfigException");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void testLegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        new KafkaStreams(this.builder.build(), this.props).close();
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        new KafkaStreams(this.builder.build(), this.props).close();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        this.globalStreams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        this.globalStreams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
        this.globalStreams.metadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
        this.globalStreams.metadataForKey("store", (Object)"key", (topic, key, value, numPartitions) -> 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        AtomicBoolean keepRunning = new AtomicBoolean(true);
        KafkaStreams streams = null;
        try {
            StreamsBuilder builder = new StreamsBuilder();
            CountDownLatch latch = new CountDownLatch(1);
            String topic = "input";
            CLUSTER.createTopics("input");
            builder.stream("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).foreach((key, value) -> {
                try {
                    latch.countDown();
                    while (keepRunning.get()) {
                        Thread.sleep(10L);
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            });
            streams = new KafkaStreams(builder.build(), this.props);
            streams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue((Object)"A", (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), System.currentTimeMillis());
            Assert.assertTrue((String)"Timed out waiting to receive single message", (boolean)latch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse((boolean)streams.close(Duration.ofMillis(10L)));
        }
        finally {
            keepRunning.set(false);
            if (streams != null) {
                streams.close();
            }
        }
    }

    @Test
    public void shouldReturnThreadMetadata() {
        this.globalStreams.start();
        Set threadMetadata = this.globalStreams.localThreadsMetadata();
        Assert.assertNotNull((Object)threadMetadata);
        Assert.assertEquals((long)2L, (long)threadMetadata.size());
        for (ThreadMetadata metadata : threadMetadata) {
            Assert.assertTrue((String)("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED"), (boolean)Utils.mkList((Object[])new String[]{"RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED"}).contains(metadata.threadState()));
            Assert.assertEquals((long)0L, (long)metadata.standbyTasks().size());
            Assert.assertEquals((long)0L, (long)metadata.activeTasks().size());
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        try {
            this.globalStreams.cleanUp();
            this.globalStreams.start();
        }
        finally {
            this.globalStreams.close();
        }
        this.globalStreams.cleanUp();
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> this.globalStreams.state() == KafkaStreams.State.RUNNING, (long)10000L, (String)"Streams never started.");
        try {
            this.globalStreams.cleanUp();
            Assert.fail((String)"Should have thrown IllegalStateException");
        }
        catch (IllegalStateException expected) {
            Assert.assertEquals((Object)"Cannot clean up while running.", (Object)expected.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCleanupOldStateDirs() throws InterruptedException {
        this.props.setProperty("state.cleanup.delay.ms", "1");
        String topic = "topic";
        CLUSTER.createTopic("topic");
        StreamsBuilder builder = new StreamsBuilder();
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        builder.table("topic", consumed);
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props);){
            CountDownLatch latch = new CountDownLatch(1);
            streams.setStateListener((newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    latch.countDown();
                }
            });
            String appDir = this.props.getProperty("state.dir") + File.separator + this.props.getProperty("application.id");
            File oldTaskDir = new File(appDir, "10_1");
            Assert.assertTrue((boolean)oldTaskDir.mkdirs());
            streams.start();
            latch.await(30L, TimeUnit.SECONDS);
            this.verifyCleanupStateDir(appDir, oldTaskDir);
            Assert.assertTrue((boolean)oldTaskDir.mkdirs());
            this.verifyCleanupStateDir(appDir, oldTaskDir);
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForClose() {
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            streams.close(Duration.ofMillis(-1L));
            Assert.fail((String)"should not accept negative close parameter");
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));
        th.start();
        try {
            th.join(30000L);
            Assert.assertFalse((boolean)th.isAlive());
        }
        finally {
            streams.close();
        }
    }

    private void verifyCleanupStateDir(String appDir, File oldTaskDir) throws InterruptedException {
        File taskDir = new File(appDir, "0_0");
        TestUtils.waitForCondition(() -> !oldTaskDir.exists() && taskDir.exists(), (long)30000L, (String)"cleanup has not successfully run");
        Assert.assertTrue((boolean)taskDir.exists());
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        int numChanges = 0;
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
        }
    }
}

