/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class ResetIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String APP_ID = "cleanup-integration-test";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive;
    private AdminClient adminClient;

    public ResetIntegrationTest() {
        this.mockTime = ResetIntegrationTest.CLUSTER.time;
        this.consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
        this.adminClient = null;
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(INPUT_TOPIC);
        CLUSTER.createTopic(OUTPUT_TOPIC);
        CLUSTER.createTopic(OUTPUT_TOPIC_2);
        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
    }

    @Before
    public void prepare() {
        this.adminClient = AdminClient.createSimplePlaintext((String)CLUSTER.bootstrapServers());
    }

    @After
    public void cleanup() {
        if (this.adminClient != null) {
            this.adminClient.close();
            this.adminClient = null;
        }
    }

    @Test
    public void testReprocessingFromScratchAfterReset() throws Exception {
        Properties streamsConfiguration = this.prepareTest();
        Properties resultTopicConsumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)"cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class);
        this.prepareInputData();
        KStreamBuilder builder = this.setupTopology(OUTPUT_TOPIC_2);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, streamsConfiguration);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10, 60000L);
        KeyValue result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0);
        streams.close();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Streams Application consumer group did not time out after 10000 ms.");
        streams = new KafkaStreams((TopologyBuilder)this.setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
        streams.cleanUp();
        this.cleanGlobal();
        TestUtils.waitForCondition((TestCondition)this.consumerGroupInactive, (long)10000L, (String)"Reset Tool consumer group did not time out after 10000 ms.");
        this.assertInternalTopicsGotDeleted();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
        KeyValue resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, 1).get(0);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        MatcherAssert.assertThat(resultRerun2, (Matcher)CoreMatchers.equalTo(result2));
    }

    private Properties prepareTest() throws Exception {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", APP_ID);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("zookeeper.connect", CLUSTER.zKConnectString());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("key.serde", Serdes.Long().getClass());
        streamsConfiguration.put("value.serde", Serdes.String().getClass());
        streamsConfiguration.put("num.stream.threads", (Object)8);
        streamsConfiguration.put("commit.interval.ms", (Object)1);
        streamsConfiguration.put("heartbeat.interval.ms", (Object)100);
        streamsConfiguration.put("session.timeout.ms", "2000");
        streamsConfiguration.put("auto.offset.reset", "earliest");
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        return streamsConfiguration;
    }

    private void prepareInputData() throws Exception {
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"aaa")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"bbb")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"ccc")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"ddd")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"eee")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"fff")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"ggg")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"hhh")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)0L, (Object)"iii")), producerConfig, this.mockTime.milliseconds());
        this.mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue((Object)1L, (Object)"jjj")), producerConfig, this.mockTime.milliseconds());
    }

    private KStreamBuilder setupTopology(String outputTopic2) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream input = builder.stream(new String[]{INPUT_TOPIC});
        KTable globalCounts = input.map((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<Long, String>>(){

            public KeyValue<Long, String> apply(Long key, String value) {
                return new KeyValue((Object)key, (Object)value);
            }
        }).groupByKey().count("global-count");
        globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
        KStream windowedCounts = input.through(INTERMEDIATE_USER_TOPIC).map((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<Long, String>>(){
            private long sleep = 1000L;

            public KeyValue<Long, String> apply(Long key, String value) {
                ResetIntegrationTest.this.mockTime.sleep(this.sleep);
                this.sleep *= 2L;
                return new KeyValue((Object)key, (Object)value);
            }
        }).groupByKey().count((Windows)TimeWindows.of((long)35L).advanceBy(10L), "count").toStream().map((KeyValueMapper)new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>(){

            public KeyValue<Long, Long> apply(Windowed<Long> key, Long value) {
                return new KeyValue((Object)(key.window().start() + key.window().end()), (Object)value);
            }
        });
        windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2);
        return builder;
    }

    private void cleanGlobal() {
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", "2000");
        int exitCode = new StreamsResetter().run(new String[]{"--application-id", APP_ID, "--bootstrap-server", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC}, cleanUpConfig);
        Assert.assertEquals((long)0L, (long)exitCode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assertInternalTopicsGotDeleted() {
        HashSet allTopics;
        HashSet<String> expectedRemainingTopicsAfterCleanup = new HashSet<String>();
        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
        expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
        try (ZkUtils zkUtils = null;){
            zkUtils = ZkUtils.apply((String)CLUSTER.zKConnectString(), (int)30000, (int)30000, (boolean)JaasUtils.isZkSecurityEnabled());
            do {
                Utils.sleep((long)100L);
                allTopics = new HashSet();
                allTopics.addAll(JavaConversions.seqAsJavaList((Seq)zkUtils.getAllTopics()));
            } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size());
        }
        MatcherAssert.assertThat(allTopics, (Matcher)CoreMatchers.equalTo(expectedRemainingTopicsAfterCleanup));
    }

    private class WaitUntilConsumerGroupGotClosed
    implements TestCondition {
        private WaitUntilConsumerGroupGotClosed() {
        }

        public boolean conditionMet() {
            return ResetIntegrationTest.this.adminClient.describeGroup(ResetIntegrationTest.APP_ID).members().isEmpty();
        }
    }
}

