/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

public class KStreamKStreamLeftJoinTest {
    private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void testLeftJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+null", 0L));
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            for (i = 0; i < 3; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("B" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+a1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+null", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+b1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 0L));
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (int i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), 0L);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+null", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("a" + expectedKey), 0L);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            this.testUpperWindowBound(expectedKeys, driver, processor);
            this.testLowerWindowBound(expectedKeys, driver, processor);
        }
    }

    private void testUpperWindowBound(int[] expectedKeys, TopologyTestDriver driver, MockProcessor<Integer, String> processor) {
        TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
        TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
        long time = 1000L;
        for (int i = 0; i < expectedKeys.length; ++i) {
            inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("b" + expectedKeys[i]), time + (long)i);
        }
        processor.checkAndClearProcessResult(EMPTY);
        time = 1100L;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("B" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+b0", 1100L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 1100L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 1100L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 1100L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+null", 1101L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 1101L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 1101L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 1101L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("D" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "D0+null", 1102L), new KeyValueTimestamp<Integer, String>(1, "D1+null", 1102L), new KeyValueTimestamp<Integer, String>(2, "D2+b2", 1102L), new KeyValueTimestamp<Integer, String>(3, "D3+b3", 1102L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("E" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "E0+null", 1103L), new KeyValueTimestamp<Integer, String>(1, "E1+null", 1103L), new KeyValueTimestamp<Integer, String>(2, "E2+null", 1103L), new KeyValueTimestamp<Integer, String>(3, "E3+b3", 1103L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("F" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "F0+null", 1104L), new KeyValueTimestamp<Integer, String>(1, "F1+null", 1104L), new KeyValueTimestamp<Integer, String>(2, "F2+null", 1104L), new KeyValueTimestamp<Integer, String>(3, "F3+null", 1104L));
    }

    private void testLowerWindowBound(int[] expectedKeys, TopologyTestDriver driver, MockProcessor<Integer, String> processor) {
        TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
        TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
        long time = 899L;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("G" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "G0+null", 899L), new KeyValueTimestamp<Integer, String>(1, "G1+null", 899L), new KeyValueTimestamp<Integer, String>(2, "G2+null", 899L), new KeyValueTimestamp<Integer, String>(3, "G3+null", 899L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("H" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "H0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "H1+null", 900L), new KeyValueTimestamp<Integer, String>(2, "H2+null", 900L), new KeyValueTimestamp<Integer, String>(3, "H3+null", 900L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("I" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "I0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "I1+b1", 1001L), new KeyValueTimestamp<Integer, String>(2, "I2+null", 901L), new KeyValueTimestamp<Integer, String>(3, "I3+null", 901L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("J" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "J0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "J1+b1", 1001L), new KeyValueTimestamp<Integer, String>(2, "J2+b2", 1002L), new KeyValueTimestamp<Integer, String>(3, "J3+null", 902L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("K" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "K0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "K1+b1", 1001L), new KeyValueTimestamp<Integer, String>(2, "K2+b2", 1002L), new KeyValueTimestamp<Integer, String>(3, "K3+b3", 1003L));
    }
}

