package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSplitTest.class */
public class KStreamSplitTest {
    private final String topicName = "topic";
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Predicate<Integer, String> isEven = (num, str) -> {
        return num.intValue() % 2 == 0;
    };
    private final Predicate<Integer, String> isMultipleOfThree = (num, str) -> {
        return num.intValue() % 3 == 0;
    };
    private final Predicate<Integer, String> isMultipleOfFive = (num, str) -> {
        return num.intValue() % 5 == 0;
    };
    private final Predicate<Integer, String> isMultipleOfSeven = (num, str) -> {
        return num.intValue() % 7 == 0;
    };
    private final Predicate<Integer, String> isNegative = (num, str) -> {
        return num.intValue() < 0;
    };
    private final KStream<Integer, String> source = this.builder.stream("topic", Consumed.with(Serdes.Integer(), Serdes.String()));

    @Test
    public void testKStreamSplit() {
        Assert.assertEquals(0L, this.source.split().branch(this.isEven, Branched.withConsumer(kStream -> {
            kStream.to("x2");
        })).branch(this.isMultipleOfThree, Branched.withConsumer(kStream2 -> {
            kStream2.to("x3");
        })).branch(this.isMultipleOfFive, Branched.withConsumer(kStream3 -> {
            kStream3.to("x5");
        })).noDefaultBranch().size());
        this.builder.build();
        withDriver(topologyTestDriver -> {
            TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("x2", new IntegerDeserializer(), new StringDeserializer());
            TestOutputTopic createOutputTopic2 = topologyTestDriver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
            TestOutputTopic createOutputTopic3 = topologyTestDriver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
            Assert.assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), createOutputTopic.readValuesToList());
            Assert.assertEquals(Arrays.asList("V3"), createOutputTopic2.readValuesToList());
            Assert.assertEquals(Arrays.asList("V5"), createOutputTopic3.readValuesToList());
        });
    }

    private void withDriver(Consumer<TopologyTestDriver> consumer) {
        int[] iArr = {-1, 0, 1, 2, 3, 4, 5, 6, 7};
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new IntegerSerializer(), new StringSerializer());
            for (int i : iArr) {
                createInputTopic.pipeInput(Integer.valueOf(i), "V" + i);
            }
            consumer.accept(topologyTestDriver);
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTypeVariance() {
        Predicate predicate = (number, obj) -> {
            return number.doubleValue() > 0.0d;
        };
        new StreamsBuilder().stream("empty").split().branch(predicate).branch((number2, obj2) -> {
            return number2.doubleValue() < 0.0d;
        });
    }

    @Test
    public void testResultingMap() {
        Map defaultBranch = this.source.split(Named.as("foo-")).branch(this.isEven, Branched.as("bar")).branch(this.isMultipleOfThree, Branched.withConsumer(kStream -> {
        })).branch(this.isMultipleOfFive, Branched.withFunction(kStream2 -> {
            return null;
        })).branch(this.isNegative, Branched.withFunction(kStream3 -> {
            return kStream3;
        })).branch(this.isMultipleOfSeven).defaultBranch();
        Assert.assertEquals(4L, defaultBranch.size());
        for (Map.Entry entry : defaultBranch.entrySet()) {
            ((KStream) entry.getValue()).to((String) entry.getKey());
        }
        this.builder.build();
        withDriver(topologyTestDriver -> {
            TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("foo-bar", new IntegerDeserializer(), new StringDeserializer());
            TestOutputTopic createOutputTopic2 = topologyTestDriver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
            TestOutputTopic createOutputTopic3 = topologyTestDriver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer());
            TestOutputTopic createOutputTopic4 = topologyTestDriver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
            Assert.assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), createOutputTopic.readValuesToList());
            Assert.assertEquals(Arrays.asList("V-1"), createOutputTopic2.readValuesToList());
            Assert.assertEquals(Arrays.asList("V7"), createOutputTopic3.readValuesToList());
            Assert.assertEquals(Arrays.asList("V1"), createOutputTopic4.readValuesToList());
        });
    }

    @Test
    public void testBranchingWithNoTerminalOperation() {
        this.source.split().branch(this.isEven, Branched.withConsumer(kStream -> {
            kStream.to("output");
        })).branch(this.isMultipleOfFive, Branched.withConsumer(kStream2 -> {
            kStream2.to("output");
        }));
        this.builder.build();
        withDriver(topologyTestDriver -> {
            Assert.assertEquals(Arrays.asList("V0", "V2", "V4", "V5", "V6"), topologyTestDriver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer()).readValuesToList());
        });
    }
}
