package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.class */
public class RepartitionOptimizingIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC = "input";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String AGGREGATION_TOPIC = "outputTopic_1";
    private static final String REDUCE_TOPIC = "outputTopic_2";
    private static final String JOINED_TOPIC = "joinedOutputTopic";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int FOUR_REPARTITION_TOPICS = 4;
    private Properties streamsConfiguration;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n      --> KSTREAM-SINK-0000000039\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n      <-- KSTREAM-FILTER-0000000040\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n      <-- KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> KSTREAM-WINDOWED-0000000033\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> KSTREAM-REDUCE-0000000023\n      <-- KSTREAM-FILTER-0000000020\n    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-JOINTHIS-0000000035\n      <-- KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-JOINOTHER-0000000036\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000034\n    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000033\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- KSTREAM-PEEK-0000000021\n    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n      <-- KSTREAM-MERGE-0000000037\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> KSTREAM-FILTER-0000000031\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> KSTREAM-FILTER-0000000025\n      <-- KSTREAM-FILTER-0000000020\n    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000008\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000016 (stores: [])\n      --> KSTREAM-SINK-0000000015\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000025 (stores: [])\n      --> KSTREAM-SINK-0000000024\n      <-- KSTREAM-PEEK-0000000021\n    Processor: KSTREAM-FILTER-0000000031 (stores: [])\n      --> KSTREAM-SINK-0000000030\n      <-- KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n      <-- KSTREAM-FILTER-0000000009\n    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n      <-- KSTREAM-FILTER-0000000016\n    Sink: KSTREAM-SINK-0000000024 (topic: KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n      <-- KSTREAM-FILTER-0000000025\n    Sink: KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n      <-- KSTREAM-FILTER-0000000031\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n      --> KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- KSTREAM-SOURCE-0000000010\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n      <-- KSTREAM-AGGREGATE-0000000007\n    Source: KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n      --> KSTREAM-WINDOWED-0000000033\n    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-JOINTHIS-0000000035\n      <-- KSTREAM-SOURCE-0000000032\n    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-JOINOTHER-0000000036\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000034\n    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000033\n    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n      <-- KSTREAM-MERGE-0000000037\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n      --> KSTREAM-AGGREGATE-0000000014\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- KSTREAM-SOURCE-0000000017\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n\n  Sub-topology: 3\n    Source: KSTREAM-SOURCE-0000000026 (topics: [KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n      --> KSTREAM-REDUCE-0000000023\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- KSTREAM-SOURCE-0000000026\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n\n";
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private final MockTime mockTime = CLUSTER.time;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest$SimpleProcessor.class */
    public static class SimpleProcessor extends AbstractProcessor<String, String> {
        final List<String> valueList;

        SimpleProcessor(List<String> list) {
            this.valueList = list;
        }

        public void process(String str, String str2) {
            this.valueList.add(str2);
        }
    }

    @Before
    public void setUp() throws Exception {
        Properties properties = new Properties();
        properties.put("cache.max.bytes.buffering", 10240);
        properties.put("commit.interval.ms", 5000);
        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("maybe-optimized-test-app", CLUSTER.bootstrapServers(), Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), properties);
        CLUSTER.createTopics(INPUT_TOPIC, COUNT_TOPIC, AGGREGATION_TOPIC, REDUCE_TOPIC, JOINED_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @After
    public void tearDown() throws Exception {
        CLUSTER.deleteAllTopicsAndWait(IntegrationTestUtils.DEFAULT_TIMEOUT);
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
        runIntegrationTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
        runIntegrationTest("none", FOUR_REPARTITION_TOPICS);
    }

    private void runIntegrationTest(String str, int i) throws Exception {
        Initializer initializer = () -> {
            return 0;
        };
        Aggregator aggregator = (str2, str3, num) -> {
            return Integer.valueOf(num.intValue() + str3.length());
        };
        Reducer reducer = (str4, str5) -> {
            return str4 + ":" + str5;
        };
        ArrayList arrayList = new ArrayList();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream map = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())).map((str6, str7) -> {
            return KeyValue.pair(str6.toUpperCase(Locale.getDefault()), str7);
        });
        map.filter((str8, str9) -> {
            return str8.equals("B");
        }).mapValues(str10 -> {
            return str10.toUpperCase(Locale.getDefault());
        }).process(() -> {
            return new SimpleProcessor(arrayList);
        }, new String[0]);
        KStream stream = map.groupByKey().count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
        stream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
        map.groupByKey().aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer())).toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
        map.filter((str11, str12) -> {
            return true;
        }).peek((str13, str14) -> {
            System.out.println(str13 + ":" + str14);
        }).groupByKey().reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
        map.filter((str15, str16) -> {
            return str15.equals("A");
        }).join(stream, (str17, l) -> {
            return str17 + ":" + l.toString();
        }, JoinWindows.of(Duration.ofMillis(5000L)), Joined.with(Serdes.String(), Serdes.String(), Serdes.Long())).to(JOINED_TOPIC);
        this.streamsConfiguration.setProperty("topology.optimization", str);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, getKeyValues(), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, IntegerDeserializer.class);
        Properties consumerConfig3 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        Topology build = streamsBuilder.build(this.streamsConfiguration);
        String obj = build.describe().toString();
        if (str.equals("all")) {
            Assert.assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, obj);
        } else {
            Assert.assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, obj);
        }
        Assert.assertEquals(i, getCountOfRepartitionTopicsFound(obj));
        KafkaStreams kafkaStreams = new KafkaStreams(build, this.streamsConfiguration);
        kafkaStreams.start();
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, COUNT_TOPIC, Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L)));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9)));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz")));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3")));
        List asList = Arrays.asList("FOO", "BAR", "BAZ");
        Assert.assertThat(3, CoreMatchers.equalTo(Integer.valueOf(arrayList.size())));
        Assert.assertThat(arrayList, CoreMatchers.equalTo(asList));
        kafkaStreams.close(Duration.ofSeconds(5L));
    }

    private int getCountOfRepartitionTopicsFound(String str) {
        Matcher matcher = this.repartitionTopicPattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList arrayList = new ArrayList();
        String[] strArr = {"foo", "bar", "baz"};
        for (String str : new String[]{"a", "b", "c"}) {
            for (String str2 : strArr) {
                arrayList.add(KeyValue.pair(str, str2));
            }
        }
        return arrayList;
    }
}
