package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.class */
public class RepartitionWithMergeOptimizingTest {
    private static final String INPUT_A_TOPIC = "inputA";
    private static final String INPUT_B_TOPIC = "inputB";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String STRING_COUNT_TOPIC = "outputTopic_1";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int TWO_REPARTITION_TOPICS = 2;
    private Properties streamsConfiguration;
    private TopologyTestDriver topologyTestDriver;
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceAStream (topics: [inputA])\n      --> mappedAStream\n    Source: sourceBStream (topics: [inputB])\n      --> mappedBStream\n    Processor: mappedAStream (stores: [])\n      --> mergedStream\n      <-- sourceAStream\n    Processor: mappedBStream (stores: [])\n      --> mergedStream\n      <-- sourceBStream\n    Processor: mergedStream (stores: [])\n      --> long-groupByKey-repartition-filter\n      <-- mappedAStream, mappedBStream\n    Processor: long-groupByKey-repartition-filter (stores: [])\n      --> long-groupByKey-repartition-sink\n      <-- mergedStream\n    Sink: long-groupByKey-repartition-sink (topic: long-groupByKey-repartition)\n      <-- long-groupByKey-repartition-filter\n\n  Sub-topology: 1\n    Source: long-groupByKey-repartition-source (topics: [long-groupByKey-repartition])\n      --> long-count, string-count\n    Processor: string-count (stores: [string-store])\n      --> string-toStream\n      <-- long-groupByKey-repartition-source\n    Processor: long-count (stores: [long-store])\n      --> long-toStream\n      <-- long-groupByKey-repartition-source\n    Processor: string-toStream (stores: [])\n      --> string-mapValues\n      <-- string-count\n    Processor: long-toStream (stores: [])\n      --> long-to\n      <-- long-count\n    Processor: string-mapValues (stores: [])\n      --> string-to\n      <-- string-toStream\n    Sink: long-to (topic: outputTopic_0)\n      <-- long-toStream\n    Sink: string-to (topic: outputTopic_1)\n      <-- string-mapValues\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceAStream (topics: [inputA])\n      --> mappedAStream\n    Source: sourceBStream (topics: [inputB])\n      --> mappedBStream\n    Processor: mappedAStream (stores: [])\n      --> mergedStream\n      <-- sourceAStream\n    Processor: mappedBStream (stores: [])\n      --> mergedStream\n      <-- sourceBStream\n    Processor: mergedStream (stores: [])\n      --> long-groupByKey-repartition-filter, string-groupByKey-repartition-filter\n      <-- mappedAStream, mappedBStream\n    Processor: long-groupByKey-repartition-filter (stores: [])\n      --> long-groupByKey-repartition-sink\n      <-- mergedStream\n    Processor: string-groupByKey-repartition-filter (stores: [])\n      --> string-groupByKey-repartition-sink\n      <-- mergedStream\n    Sink: long-groupByKey-repartition-sink (topic: long-groupByKey-repartition)\n      <-- long-groupByKey-repartition-filter\n    Sink: string-groupByKey-repartition-sink (topic: string-groupByKey-repartition)\n      <-- string-groupByKey-repartition-filter\n\n  Sub-topology: 1\n    Source: long-groupByKey-repartition-source (topics: [long-groupByKey-repartition])\n      --> long-count\n    Processor: long-count (stores: [long-store])\n      --> long-toStream\n      <-- long-groupByKey-repartition-source\n    Processor: long-toStream (stores: [])\n      --> long-to\n      <-- long-count\n    Sink: long-to (topic: outputTopic_0)\n      <-- long-toStream\n\n  Sub-topology: 2\n    Source: string-groupByKey-repartition-source (topics: [string-groupByKey-repartition])\n      --> string-count\n    Processor: string-count (stores: [string-store])\n      --> string-toStream\n      <-- string-groupByKey-repartition-source\n    Processor: string-toStream (stores: [])\n      --> string-mapValues\n      <-- string-count\n    Processor: string-mapValues (stores: [])\n      --> string-to\n      <-- string-toStream\n    Sink: string-to (topic: outputTopic_1)\n      <-- string-mapValues\n\n";
    private final Logger log = LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class);
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L));
    private final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6"));

    @Before
    public void setUp() {
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
        this.streamsConfiguration.setProperty("statestore.cache.max.bytes", Integer.toString(10240));
        this.streamsConfiguration.setProperty("commit.interval.ms", Long.toString(5000L));
    }

    @After
    public void tearDown() {
        this.topologyTestDriver.close();
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() {
        runTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() {
        runTest("none", TWO_REPARTITION_TOPICS);
    }

    private void runTest(String str, int i) {
        this.streamsConfiguration.setProperty("topology.optimization", str);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream merge = streamsBuilder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceAStream")).map((str2, str3) -> {
            return KeyValue.pair(str3.split(":")[0], str3);
        }, Named.as("mappedAStream")).merge(streamsBuilder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceBStream")).map((str4, str5) -> {
            return KeyValue.pair(str5.split(":")[0], str5);
        }, Named.as("mappedBStream")), Named.as("mergedStream"));
        merge.groupByKey(Grouped.as("long-groupByKey")).count(Named.as("long-count"), Materialized.as(Stores.inMemoryKeyValueStore("long-store"))).toStream(Named.as("long-toStream")).to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()).withName("long-to"));
        merge.groupByKey(Grouped.as("string-groupByKey")).count(Named.as("string-count"), Materialized.as(Stores.inMemoryKeyValueStore("string-store"))).toStream(Named.as("string-toStream")).mapValues(l -> {
            return l.toString();
        }, Named.as("string-mapValues")).to(STRING_COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.String()).withName("string-to"));
        Topology build = streamsBuilder.build(this.streamsConfiguration);
        this.topologyTestDriver = new TopologyTestDriver(build, this.streamsConfiguration);
        TestInputTopic createInputTopic = this.topologyTestDriver.createInputTopic(INPUT_A_TOPIC, this.stringSerializer, this.stringSerializer);
        TestInputTopic createInputTopic2 = this.topologyTestDriver.createInputTopic(INPUT_B_TOPIC, this.stringSerializer, this.stringSerializer);
        TestOutputTopic createOutputTopic = this.topologyTestDriver.createOutputTopic(COUNT_TOPIC, this.stringDeserializer, new LongDeserializer());
        TestOutputTopic createOutputTopic2 = this.topologyTestDriver.createOutputTopic(STRING_COUNT_TOPIC, this.stringDeserializer, this.stringDeserializer);
        createInputTopic.pipeKeyValueList(getKeyValues());
        createInputTopic2.pipeKeyValueList(getKeyValues());
        String obj = build.describe().toString();
        if (str.equals("all")) {
            Assert.assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, obj);
        } else {
            Assert.assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, obj);
        }
        Assert.assertEquals(i, getCountOfRepartitionTopicsFound(obj));
        MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.equalTo(keyValueListToMap(this.expectedCountKeyValues)));
        MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToMap(), CoreMatchers.equalTo(keyValueListToMap(this.expectedStringCountKeyValues)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <K, V> Map<K, V> keyValueListToMap(List<KeyValue<K, V>> list) {
        HashMap hashMap = new HashMap();
        for (KeyValue<K, V> keyValue : list) {
            hashMap.put(keyValue.key, keyValue.value);
        }
        return hashMap;
    }

    private int getCountOfRepartitionTopicsFound(String str) {
        Matcher matcher = this.repartitionTopicPattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList arrayList = new ArrayList();
        String[] strArr = {"A:foo", "B:foo", "C:foo"};
        for (String str : new String[]{"X", "Y", "Z"}) {
            for (String str2 : strArr) {
                arrayList.add(KeyValue.pair(str, str2));
            }
        }
        return arrayList;
    }
}
