/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.AbstractJoinIntegrationTest;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public class StreamStreamJoinIntegrationTest
extends AbstractJoinIntegrationTest {
    private KStream<Long, String> leftStream;
    private KStream<Long, String> rightStream;

    public StreamStreamJoinIntegrationTest(boolean cacheEnabled) {
        super(cacheEnabled);
    }

    @Before
    public void prepareTopology() throws InterruptedException {
        super.prepareEnvironment();
        appID = "stream-stream-join-integration-test";
        this.builder = new StreamsBuilder();
        this.leftStream = this.builder.stream("inputTopicLeft");
        this.rightStream = this.builder.stream("inputTopicRight");
    }

    @Test
    public void testInner() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-inner");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d", 15L)));
        this.leftStream.join(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }

    @Test
    public void testInnerRepartitioned() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-inner-repartitioned");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d", 15L)));
        this.leftStream.map(MockMapper.noOpKeyValueMapper()).join(this.rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }

    @Test
    public void testLeft() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-left");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-null", 3L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d", 15L)));
        this.leftStream.leftJoin(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }

    @Test
    public void testLeftRepartitioned() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-left-repartitioned");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-null", 3L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d", 15L)));
        this.leftStream.map(MockMapper.noOpKeyValueMapper()).leftJoin(this.rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }

    @Test
    public void testOuter() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-outer");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-null", 3L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d", 15L)));
        this.leftStream.outerJoin(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }

    @Test
    public void testOuterRepartitioned() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-outer");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-null", 3L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d", 15L)));
        this.leftStream.map(MockMapper.noOpKeyValueMapper()).outerJoin(this.rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }

    @Test
    public void testMultiInner() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-multi-inner");
        List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "A-a-a", 4L)), Collections.singletonList(new KeyValueTimestamp<Long, String>(0L, "B-a-a", 5L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-b-a", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b-a", 6L), new KeyValueTimestamp<Long, String>(0L, "A-a-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-a-b", 6L), new KeyValueTimestamp<Long, String>(0L, "A-b-b", 6L), new KeyValueTimestamp<Long, String>(0L, "B-b-b", 6L)), null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "C-a-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-a-b", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b-a", 9L), new KeyValueTimestamp<Long, String>(0L, "C-b-b", 9L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-c-a", 10L), new KeyValueTimestamp<Long, String>(0L, "A-c-b", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c-a", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c-b", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c-a", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c-b", 10L), new KeyValueTimestamp<Long, String>(0L, "A-a-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-a-c", 10L), new KeyValueTimestamp<Long, String>(0L, "A-b-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-b-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-a-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-b-c", 10L), new KeyValueTimestamp<Long, String>(0L, "A-c-c", 10L), new KeyValueTimestamp<Long, String>(0L, "B-c-c", 10L), new KeyValueTimestamp<Long, String>(0L, "C-c-c", 10L)), null, null, null, Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "A-d-a", 14L), new KeyValueTimestamp<Long, String>(0L, "A-d-b", 14L), new KeyValueTimestamp<Long, String>(0L, "A-d-c", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d-a", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d-b", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d-c", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d-a", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d-b", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d-c", 14L), new KeyValueTimestamp<Long, String>(0L, "A-a-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-a-d", 14L), new KeyValueTimestamp<Long, String>(0L, "A-b-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-b-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-a-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-b-d", 14L), new KeyValueTimestamp<Long, String>(0L, "A-c-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-c-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-c-d", 14L), new KeyValueTimestamp<Long, String>(0L, "A-d-d", 14L), new KeyValueTimestamp<Long, String>(0L, "B-d-d", 14L), new KeyValueTimestamp<Long, String>(0L, "C-d-d", 14L)), Arrays.asList(new KeyValueTimestamp<Long, String>(0L, "D-a-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-a-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-a-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-a-d", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-b-d", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-c-d", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d-a", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d-b", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d-c", 15L), new KeyValueTimestamp<Long, String>(0L, "D-d-d", 15L)));
        this.leftStream.join(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).join(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTest(expectedResult);
    }
}

