/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class RepartitionWithMergeOptimizingIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_A_TOPIC = "inputA";
    private static final String INPUT_B_TOPIC = "inputB";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String COUNT_STRING_TOPIC = "outputTopic_1";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int TWO_REPARTITION_TOPICS = 2;
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private Properties streamsConfiguration;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n      --> KSTREAM-MAP-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n      --> KSTREAM-FILTER-0000000021\n      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n    Processor: KSTREAM-FILTER-0000000021 (stores: [])\n      --> KSTREAM-SINK-0000000020\n      <-- KSTREAM-MERGE-0000000004\n    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n      <-- KSTREAM-FILTER-0000000021\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n      --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- KSTREAM-SOURCE-0000000022\n    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> KTABLE-TOSTREAM-0000000010\n      <-- KSTREAM-SOURCE-0000000022\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000018\n      <-- KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KTABLE-TOSTREAM-0000000017\n    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n      --> KSTREAM-SINK-0000000011\n      <-- KSTREAM-AGGREGATE-0000000006\n    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000010\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KSTREAM-MAPVALUES-0000000018\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n      --> KSTREAM-MAP-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n      --> KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n    Processor: KSTREAM-FILTER-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000007\n      <-- KSTREAM-MERGE-0000000004\n    Processor: KSTREAM-FILTER-0000000015 (stores: [])\n      --> KSTREAM-SINK-0000000014\n      <-- KSTREAM-MERGE-0000000004\n    Sink: KSTREAM-SINK-0000000007 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n      <-- KSTREAM-FILTER-0000000008\n    Sink: KSTREAM-SINK-0000000014 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n      <-- KSTREAM-FILTER-0000000015\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000009 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n      --> KSTREAM-AGGREGATE-0000000006\n    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> KTABLE-TOSTREAM-0000000010\n      <-- KSTREAM-SOURCE-0000000009\n    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n      --> KSTREAM-SINK-0000000011\n      <-- KSTREAM-AGGREGATE-0000000006\n    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000010\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000016 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n      --> KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- KSTREAM-SOURCE-0000000016\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000018\n      <-- KSTREAM-AGGREGATE-0000000013\n    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KTABLE-TOSTREAM-0000000017\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KSTREAM-MAPVALUES-0000000018\n\n";

    public RepartitionWithMergeOptimizingIntegrationTest() {
        this.mockTime = RepartitionWithMergeOptimizingIntegrationTest.CLUSTER.time;
    }

    @Before
    public void setUp() throws Exception {
        Properties props = new Properties();
        props.put("cache.max.bytes.buffering", (Object)10240);
        props.put("commit.interval.ms", (Object)5000);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("maybe-optimized-with-merge-test-app", CLUSTER.bootstrapServers(), Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), props);
        CLUSTER.createTopics(COUNT_TOPIC, COUNT_STRING_TOPIC, INPUT_A_TOPIC, INPUT_B_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @After
    public void tearDown() throws Exception {
        CLUSTER.deleteAllTopicsAndWait(30000L);
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
        this.runIntegrationTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
        this.runIntegrationTest("none", 2);
    }

    private void runIntegrationTest(String optimizationConfig, int expectedNumberRepartitionTopics) throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceAStream = builder.stream(INPUT_A_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream sourceBStream = builder.stream(INPUT_B_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream mappedAStream = sourceAStream.map((k, v) -> KeyValue.pair((Object)v.split(":")[0], (Object)v));
        KStream mappedBStream = sourceBStream.map((k, v) -> KeyValue.pair((Object)v.split(":")[0], (Object)v));
        KStream mergedStream = mappedAStream.merge(mappedBStream);
        mergedStream.groupByKey().count().toStream().to(COUNT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        mergedStream.groupByKey().count().toStream().mapValues(v -> v.toString()).to(COUNT_STRING_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.streamsConfiguration.setProperty("topology.optimization", optimizationConfig);
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, this.getKeyValues(), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, this.getKeyValues(), producerConfig, (Time)this.mockTime);
        Properties consumerConfig1 = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        Properties consumerConfig2 = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        Topology topology = builder.build(this.streamsConfiguration);
        String topologyString = topology.describe().toString();
        System.out.println(topologyString);
        if (optimizationConfig.equals("all")) {
            Assert.assertEquals((Object)EXPECTED_OPTIMIZED_TOPOLOGY, (Object)topologyString);
        } else {
            Assert.assertEquals((Object)EXPECTED_UNOPTIMIZED_TOPOLOGY, (Object)topologyString);
        }
        Assert.assertEquals((long)expectedNumberRepartitionTopics, (long)this.getCountOfRepartitionTopicsFound(topologyString));
        KafkaStreams streams = new KafkaStreams(topology, this.streamsConfiguration);
        streams.start();
        List expectedCountKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)6L), KeyValue.pair((Object)"B", (Object)6L), KeyValue.pair((Object)"C", (Object)6L));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues);
        List expectedStringCountKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)"6"), KeyValue.pair((Object)"B", (Object)"6"), KeyValue.pair((Object)"C", (Object)"6"));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues);
        streams.close(Duration.ofSeconds(5L));
    }

    private int getCountOfRepartitionTopicsFound(String topologyString) {
        Matcher matcher = this.repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList<KeyValue<String, String>> keyValueList = new ArrayList<KeyValue<String, String>>();
        String[] keys = new String[]{"X", "Y", "Z"};
        String[] values = new String[]{"A:foo", "B:foo", "C:foo"};
        for (String key : keys) {
            for (String value : values) {
                keyValueList.add((KeyValue<String, String>)KeyValue.pair((Object)key, (Object)value));
            }
        }
        return keyValueList;
    }
}

