package org.apache.kafka.streams.kstream.internals.graph;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.class */
public class StreamsGraphTest {
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private String expectedJoinedTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n";
    private String expectedJoinedFilteredTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> none\n      <-- KSTREAM-MERGE-0000000006\n\n";
    private String expectedFullTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000008\n      <-- KSTREAM-MERGE-0000000006\n    Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000009\n      <-- KSTREAM-FILTER-0000000007\n    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n      <-- KSTREAM-MAPVALUES-0000000008\n\n";

    @Test
    public void shouldBeAbleToBuildTopologyIncrementally() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream join = streamsBuilder.stream("topic").join(streamsBuilder.stream("other-topic"), (str, str2) -> {
            return str + str2;
        }, JoinWindows.of(Duration.ofMillis(5000L)));
        Assert.assertEquals(this.expectedJoinedTopology, streamsBuilder.build().describe().toString());
        KStream filter = join.filter((str3, str4) -> {
            return str4.equals("foo");
        });
        Assert.assertEquals(this.expectedJoinedFilteredTopology, streamsBuilder.build().describe().toString());
        filter.mapValues(str5 -> {
            return str5 + "some value";
        }).to("output-topic");
        Assert.assertEquals(this.expectedFullTopology, streamsBuilder.build().describe().toString());
    }

    @Test
    public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
        Assert.assertEquals(getTopologyWithChangingValuesAfterChangingKey("all").describe().toString(), getTopologyWithChangingValuesAfterChangingKey("none").describe().toString());
        Assert.assertEquals(2L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
        Assert.assertEquals(2L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
    }

    @Test
    public void shouldNotOptimizeWhenAThroughOperationIsDone() {
        Assert.assertEquals(getTopologyWithThroughOperation("all").describe().toString(), getTopologyWithThroughOperation("none").describe().toString());
        Assert.assertEquals(0L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
        Assert.assertEquals(0L, getCountOfRepartitionTopicsFound(r0.describe().toString()));
    }

    private Topology getTopologyWithChangingValuesAfterChangingKey(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", str);
        KStream selectKey = streamsBuilder.stream("input").selectKey((str2, str3) -> {
            return str2 + str3;
        });
        selectKey.mapValues(str4 -> {
            return str4.toUpperCase(Locale.getDefault());
        }).groupByKey().count().toStream().to("output");
        selectKey.flatMapValues(str5 -> {
            return Arrays.asList(str5.split("\\s"));
        }).groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return streamsBuilder.build(properties);
    }

    private Topology getTopologyWithThroughOperation(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", str);
        KStream through = streamsBuilder.stream("input").selectKey((str2, str3) -> {
            return str2 + str3;
        }).through("through-topic");
        through.groupByKey().count().toStream().to("output");
        through.groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return streamsBuilder.build(properties);
    }

    private int getCountOfRepartitionTopicsFound(String str) {
        Matcher matcher = this.repartitionTopicPattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }
}
