package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImplTest.class */
public class KStreamImplTest {
    private KStream<String, String> testStream;
    private StreamsBuilder builder;
    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
    private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.testStream = this.builder.stream("source");
    }

    @Test
    public void testNumProcesses() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = streamsBuilder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        KStream mapValues = stream.filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.2
            public boolean test(String str, String str2) {
                return true;
            }
        }).filterNot(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.1
            public boolean test(String str, String str2) {
                return false;
            }
        }).mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.3
            public Integer apply(String str) {
                return new Integer(str);
            }
        });
        KStream flatMapValues = stream2.flatMapValues(new ValueMapper<String, Iterable<Integer>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.4
            public Iterable<Integer> apply(String str) {
                return Collections.singletonList(new Integer(str));
            }
        });
        KStream[] branch = mapValues.branch(new Predicate[]{new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.5
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.6
            public boolean test(String str, Integer num) {
                return true;
            }
        }});
        KStream[] branch2 = flatMapValues.branch(new Predicate[]{new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.7
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }, new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.8
            public boolean test(String str, Integer num) {
                return true;
            }
        }});
        Joined with = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
        KStream join = branch[0].join(branch2[0], new ValueJoiner<Integer, Integer, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.9
            public Integer apply(Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, JoinWindows.of(1L), with);
        branch[1].join(branch2[1], new ValueJoiner<Integer, Integer, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.10
            public Integer apply(Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, JoinWindows.of(1L), with);
        join.to("topic-5");
        branch[1].through("topic-6").process(new MockProcessorSupplier(), new String[0]);
        Assert.assertEquals(26L, TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").build((Integer) null).processors().size());
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = streamsBuilder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        stream.to("topic-5");
        stream2.through("topic-6");
        ProcessorTopology build = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").build((Integer) null);
        Assert.assertThat(build.source("topic-6").getTimestampExtractor(), IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        Assert.assertEquals(build.source("topic-4").getTimestampExtractor(), (Object) null);
        Assert.assertEquals(build.source("topic-3").getTimestampExtractor(), (Object) null);
        Assert.assertEquals(build.source("topic-2").getTimestampExtractor(), (Object) null);
        Assert.assertEquals(build.source("topic-1").getTimestampExtractor(), (Object) null);
    }

    @Test
    public void shouldSendDataThroughTopicUsingProduced() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic", "a", "b"));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertThat(this.processorSupplier.theCapturedProcessor().processed, CoreMatchers.equalTo(Collections.singletonList("a:b")));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSendDataToTopicUsingProduced() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).to("to-topic", Produced.with(Serdes.String(), Serdes.String()));
        streamsBuilder.stream("to-topic", this.stringConsumed).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic", "e", "f"));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertThat(this.processorSupplier.theCapturedProcessor().processed, CoreMatchers.equalTo(Collections.singletonList("e:f")));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSendDataToDynamicTopics() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic", this.stringConsumed).to((str, str2, recordContext) -> {
            return recordContext.topic() + "-" + str + "-" + str2.substring(0, 1);
        }, Produced.with(Serdes.String(), Serdes.String()));
        streamsBuilder.stream("topic-a-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        streamsBuilder.stream("topic-b-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic", "a", "v1"));
            topologyTestDriver.pipeInput(this.recordFactory.create("topic", "a", "v2"));
            topologyTestDriver.pipeInput(this.recordFactory.create("topic", "b", "v1"));
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            List<MockProcessor<String, String>> capturedProcessors = this.processorSupplier.capturedProcessors(2);
            Assert.assertThat(capturedProcessors.get(0).processed, CoreMatchers.equalTo(Utils.mkList(new String[]{"a:v1", "a:v2"})));
            Assert.assertThat(capturedProcessors.get(1).processed, CoreMatchers.equalTo(Collections.singletonList("b:v1")));
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("topic-1", this.stringConsumed);
        ValueJoiner instance = MockValueJoiner.instance(":");
        long convert = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        stream.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImplTest.11
            public KeyValue<? extends String, ? extends String> apply(String str, String str2) {
                return KeyValue.pair(str2, str2);
            }
        }).join(stream, instance, JoinWindows.of(convert).until(3 * convert), Joined.with(Serdes.String(), Serdes.String(), Serdes.String())).to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
        ProcessorTopology build = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).setApplicationId("X").build();
        SourceNode source = build.source("topic-1");
        for (SourceNode sourceNode : build.sources()) {
            if (sourceNode.name().equals(source.name())) {
                Assert.assertEquals(sourceNode.getTimestampExtractor(), (Object) null);
            } else {
                Assert.assertThat(sourceNode.getTimestampExtractor(), IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
            }
        }
    }

    @Test
    public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic").selectKey((str, str2) -> {
            return str2;
        }).join(streamsBuilder.globalTable("globalTopic"), (str3, str4) -> {
            return str3 + str4;
        }, (str5, str6) -> {
            return str5 + str6;
        }).groupByKey().count();
        Matcher matcher = Pattern.compile("Sink: .*-repartition").matcher(streamsBuilder.build().describe().toString());
        Assert.assertTrue(matcher.find());
        String group = matcher.group();
        Assert.assertThat(group, CoreMatchers.notNullValue());
        Assert.assertTrue(group.endsWith("repartition"));
    }

    @Test
    public void testToWithNullValueSerdeDoesntNPE() {
        new StreamsBuilder().stream(Collections.singleton("input"), Consumed.with(Serdes.String(), Serdes.String())).to("output", Produced.with(Serdes.String(), Serdes.String()));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.testStream.filter((Predicate) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.testStream.filterNot((Predicate) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnSelectKey() {
        this.testStream.selectKey((KeyValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnMap() {
        this.testStream.map((KeyValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.testStream.mapValues((ValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValuesWithKey() {
        this.testStream.mapValues((ValueMapperWithKey) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMap() {
        this.testStream.flatMap((KeyValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValues() {
        this.testStream.flatMapValues((ValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
        this.testStream.flatMapValues((ValueMapperWithKey) null);
    }

    @Test(expected = IllegalArgumentException.class)
    public void shouldHaveAtLeastOnPredicateWhenBranching() {
        this.testStream.branch(new Predicate[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldCantHaveNullPredicate() {
        this.testStream.branch(new Predicate[]{(Predicate) null});
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicOnThrough() {
        this.testStream.through((String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicOnTo() {
        this.testStream.to((String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicChooserOnTo() {
        this.testStream.to((TopicNameExtractor) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransform() {
        this.testStream.transform((TransformerSupplier) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransformValues() {
        this.testStream.transformValues((ValueTransformerSupplier) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() {
        this.testStream.transformValues((ValueTransformerWithKeySupplier) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullProcessSupplier() {
        this.testStream.process((ProcessorSupplier) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullOtherStreamOnJoin() {
        this.testStream.join((KStream) null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10L));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullValueJoinerOnJoin() {
        this.testStream.join(this.testStream, (ValueJoiner) null, JoinWindows.of(10L));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullJoinWindowsOnJoin() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, (JoinWindows) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTableOnTableJoin() {
        this.testStream.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullValueMapperOnTableJoin() {
        this.testStream.leftJoin(this.builder.table("topic", this.stringConsumed), (ValueJoiner) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.testStream.groupBy((KeyValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullActionOnForEach() {
        this.testStream.foreach((ForeachAction) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
        this.testStream.join((GlobalKTable) null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), (KeyValueMapper) null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), (ValueJoiner) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
        this.testStream.leftJoin((GlobalKTable) null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), (KeyValueMapper) null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), (ValueJoiner) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
        this.testStream.print((Printed) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
        this.testStream.through("topic", (Produced) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnToWhenProducedIsNull() {
        this.testStream.to("topic", (Produced) null);
    }

    @Test
    public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
        try {
            this.testStream.leftJoin(this.builder.table("blah", this.stringConsumed), MockValueJoiner.TOSTRING_JOINER, (Joined) null);
            Assert.fail("Should have thrown NullPointerException");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
        try {
            this.testStream.join(this.builder.table("blah", this.stringConsumed), MockValueJoiner.TOSTRING_JOINER, (Joined) null);
            Assert.fail("Should have thrown NullPointerException");
        } catch (NullPointerException e) {
        }
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWithStreamWhenJoinedIsNull() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10L), (Joined) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
        this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10L), (Joined) null);
    }

    @Test
    public void shouldMergeTwoStreams() {
        this.builder.stream("topic-1").merge(this.builder.stream("topic-2")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(this.recordFactory.create("topic-1", "A", "aa"));
            topologyTestDriver.pipeInput(this.recordFactory.create("topic-2", "B", "bb"));
            topologyTestDriver.pipeInput(this.recordFactory.create("topic-2", "C", "cc"));
            topologyTestDriver.pipeInput(this.recordFactory.create("topic-1", "D", "dd"));
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            Assert.assertEquals(Utils.mkList(new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), this.processorSupplier.theCapturedProcessor().processed);
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMergeMultipleStreams() {
        this.builder.stream("topic-1").merge(this.builder.stream("topic-2")).merge(this.builder.stream("topic-3")).merge(this.builder.stream("topic-4")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-1", "A", "aa"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-2", "B", "bb"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-3", "C", "cc"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-4", "D", "dd"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-4", "E", "ee"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-3", "F", "ff"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-2", "G", "gg"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-1", "H", "hh"));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Utils.mkList(new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"}), this.processorSupplier.theCapturedProcessor().processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        this.builder.stream(Pattern.compile("topic-\\d")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-3", "A", "aa"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-4", "B", "bb"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-5", "C", "cc"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-6", "D", "dd"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-7", "E", "ee"));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Utils.mkList(new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee"}), this.processorSupplier.theCapturedProcessor().processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        this.builder.stream(Pattern.compile("topic-\\d")).merge(this.builder.stream(Pattern.compile("topic-[A-Z]"))).merge(this.builder.stream("topic-without-pattern")).process(this.processorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-3", "A", "aa"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-4", "B", "bb"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-A", "C", "cc"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-Z", "D", "dd"));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic-without-pattern", "E", "ee"));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Utils.mkList(new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee"}), this.processorSupplier.theCapturedProcessor().processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }
}
