/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public class StreamsUncaughtExceptionHandlerIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
    public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30L);
    @Rule
    public final TestName testName = new TestName();
    private final String testId = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
    private final String appId = "appId_" + this.testId;
    private final String inputTopic = "input" + this.testId;
    private final String inputTopic2 = "input2" + this.testId;
    private final String outputTopic = "output" + this.testId;
    private final String outputTopic2 = "output2" + this.testId;
    private final StreamsBuilder builder = new StreamsBuilder();
    private final List<String> processorValueCollector = new ArrayList<String>();
    private static AtomicBoolean throwError = new AtomicBoolean(true);
    private final Properties properties = this.basicProps();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    private Properties basicProps() {
        return Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)this.appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.stream.threads", (Object)2), Utils.mkEntry((Object)"default.key.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"default.value.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)10000)}));
    }

    @Before
    public void setup() {
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, this.inputTopic, this.inputTopic2, this.outputTopic, this.outputTopic2);
        KStream stream = this.builder.stream(this.inputTopic);
        stream.process(() -> new ShutdownProcessor(this.processorValueCollector), Named.as((String)"process"), new String[0]);
    }

    @After
    public void teardown() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(this.properties);
    }

    @Test
    public void shouldShutdownThreadUsingOldHandler() throws Exception {
        try (KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.properties);){
            AtomicInteger counter = new AtomicInteger(0);
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> counter.incrementAndGet());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            this.produceMessages(0L, this.inputTopic, "A");
            TestUtils.waitForCondition(() -> counter.get() == 1, (String)"Handler was called 1st time");
            TestUtils.waitForCondition(() -> counter.get() == 2, (long)DEFAULT_DURATION.toMillis(), (String)"Handler was called 2nd time");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)this.processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        }
    }

    @Test
    public void shouldShutdownClient() throws Exception {
        try (KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            this.produceMessages(0L, this.inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)this.processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
    }

    @Test
    public void shouldReplaceThreads() throws Exception {
        this.testReplaceThreads(2);
    }

    @Test
    public void shouldReplaceThreadsWithoutJavaHandler() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler((t, e) -> Assert.fail((String)"exception thrown"));
        this.testReplaceThreads(2);
    }

    @Test
    public void shouldReplaceSingleThread() throws Exception {
        this.testReplaceThreads(1);
    }

    @Test
    public void shouldShutdownMultipleThreadApplication() throws Exception {
        this.testShutdownApplication(2);
    }

    @Test
    public void shouldShutdownSingleThreadApplication() throws Exception {
        this.testShutdownApplication(1);
    }

    @Test
    public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throws Exception {
        this.builder.addGlobalStore((StoreBuilder)new KeyValueStoreBuilder(Stores.persistentKeyValueStore((String)"globalStore"), Serdes.String(), Serdes.String(), (Time)StreamsUncaughtExceptionHandlerIntegrationTest.CLUSTER.time), this.inputTopic2, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()), () -> new ShutdownProcessor(this.processorValueCollector));
        this.properties.put("num.stream.threads", (Object)0);
        try (KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            this.produceMessages(0L, this.inputTopic2, "A");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)this.processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
    }

    @Test
    public void shouldEmitSameRecordAfterFailover() throws Exception {
        this.properties.put("num.stream.threads", (Object)1);
        this.properties.put("commit.interval.ms", (Object)300000L);
        this.properties.put("cache.max.bytes.buffering", (Object)0);
        this.properties.put("default.key.serde", Serdes.IntegerSerde.class);
        this.properties.put("default.value.serde", Serdes.StringSerde.class);
        this.properties.put("session.timeout.ms", (Object)10000);
        AtomicBoolean shouldThrow = new AtomicBoolean(true);
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(this.inputTopic, Materialized.as((String)"test-store")).toStream().map((key, value) -> {
            if (shouldThrow.compareAndSet(true, false)) {
                throw new RuntimeException("Kaboom");
            }
            return new KeyValue(key, value);
        }).to(this.outputTopic);
        builder.stream(this.inputTopic2).to(this.outputTopic2);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), this.properties);){
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), 0L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic2, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), 0L);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), this.outputTopic, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")));
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), this.outputTopic2, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")));
        }
    }

    private void produceMessages(long timestamp, String streamOneInput, String msg) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(streamOneInput, Collections.singletonList(new KeyValue((Object)"1", (Object)msg)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void testShutdownApplication(int numThreads) throws Exception {
        this.properties.put("num.stream.threads", (Object)numThreads);
        Topology topology = this.builder.build();
        try (KafkaStreams kafkaStreams1 = new KafkaStreams(topology, this.properties);
             KafkaStreams kafkaStreams2 = new KafkaStreams(topology, this.properties);){
            kafkaStreams1.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams2.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams1.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION);
            kafkaStreams2.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(kafkaStreams1, kafkaStreams2));
            this.produceMessages(0L, this.inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Arrays.asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)this.processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
    }

    private void testReplaceThreads(int numThreads) throws Exception {
        this.properties.put("num.stream.threads", (Object)numThreads);
        try (KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            AtomicInteger count = new AtomicInteger();
            kafkaStreams.setUncaughtExceptionHandler(exception -> {
                if (count.incrementAndGet() == numThreads) {
                    throwError.set(false);
                }
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            this.produceMessages(0L, this.inputTopic, "A");
            TestUtils.waitForCondition(() -> count.get() == numThreads, (String)"finished replacing threads");
            TestUtils.waitForCondition(() -> throwError.get(), (String)"finished replacing threads");
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            MatcherAssert.assertThat((String)"All initial threads have failed and the replacement thread had processed on record", (Object)this.processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)(numThreads + 1)));
        }
    }

    private static class ShutdownProcessor
    extends AbstractProcessor<String, String> {
        final List<String> valueList;

        ShutdownProcessor(List<String> valueList) {
            this.valueList = valueList;
        }

        public void process(String key, String value) {
            this.valueList.add(value + " " + this.context.taskId());
            if (throwError.get()) {
                throw new StreamsException(Thread.currentThread().getName());
            }
            throwError.set(true);
        }
    }
}

