package org.apache.storm.streams.processors;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.PairValueJoiner;
import org.apache.storm.streams.processors.JoinProcessor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/storm/streams/processors/JoinProcessorTest.class */
public class JoinProcessorTest {
    JoinProcessor<Integer, Pair<Integer, Integer>, Integer, Integer> joinProcessor;
    String leftStream = "left";
    String rightStream = "right";
    List<Pair<Integer, List<Pair<Integer, Integer>>>> res = new ArrayList();
    ProcessorContext context = new ProcessorContext() { // from class: org.apache.storm.streams.processors.JoinProcessorTest.1
        public <T> void forward(T t) {
            JoinProcessorTest.this.res.add((Pair) t);
        }

        public <T> void forward(T t, String str) {
        }

        public boolean isWindowed() {
            return true;
        }

        public Set<String> getWindowedParentStreams() {
            return null;
        }
    };
    List<Pair<Integer, Integer>> leftKeyValeus = Arrays.asList(Pair.of(2, 4), Pair.of(5, 25), Pair.of(7, 49));
    List<Pair<Integer, Integer>> rightKeyValues = Arrays.asList(Pair.of(1, 1), Pair.of(2, 8), Pair.of(5, 125), Pair.of(6, 216));

    @Test
    public void testInnerJoin() throws Exception {
        this.joinProcessor = new JoinProcessor<>(this.leftStream, this.rightStream, new PairValueJoiner());
        processValues();
        Assert.assertEquals(Pair.of(2, Pair.of(4, 8)), this.res.get(0));
        Assert.assertEquals(Pair.of(5, Pair.of(25, 125)), this.res.get(1));
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        this.joinProcessor = new JoinProcessor<>(this.leftStream, this.rightStream, new PairValueJoiner(), JoinProcessor.JoinType.OUTER, JoinProcessor.JoinType.INNER);
        processValues();
        Assert.assertEquals(Pair.of(2, Pair.of(4, 8)), this.res.get(0));
        Assert.assertEquals(Pair.of(5, Pair.of(25, 125)), this.res.get(1));
        Assert.assertEquals(Pair.of(7, Pair.of(49, (Object) null)), this.res.get(2));
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        this.joinProcessor = new JoinProcessor<>(this.leftStream, this.rightStream, new PairValueJoiner(), JoinProcessor.JoinType.INNER, JoinProcessor.JoinType.OUTER);
        processValues();
        Assert.assertEquals(Pair.of(1, Pair.of((Object) null, 1)), this.res.get(0));
        Assert.assertEquals(Pair.of(2, Pair.of(4, 8)), this.res.get(1));
        Assert.assertEquals(Pair.of(5, Pair.of(25, 125)), this.res.get(2));
        Assert.assertEquals(Pair.of(6, Pair.of((Object) null, 216)), this.res.get(3));
    }

    @Test
    public void testFullOuterJoin() throws Exception {
        this.joinProcessor = new JoinProcessor<>(this.leftStream, this.rightStream, new PairValueJoiner(), JoinProcessor.JoinType.OUTER, JoinProcessor.JoinType.OUTER);
        processValues();
        Assert.assertEquals(Pair.of(1, Pair.of((Object) null, 1)), this.res.get(0));
        Assert.assertEquals(Pair.of(2, Pair.of(4, 8)), this.res.get(1));
        Assert.assertEquals(Pair.of(5, Pair.of(25, 125)), this.res.get(2));
        Assert.assertEquals(Pair.of(6, Pair.of((Object) null, 216)), this.res.get(3));
        Assert.assertEquals(Pair.of(7, Pair.of(49, (Object) null)), this.res.get(4));
    }

    private void processValues() {
        this.res.clear();
        this.joinProcessor.init(this.context);
        Iterator<Pair<Integer, Integer>> it = this.leftKeyValeus.iterator();
        while (it.hasNext()) {
            this.joinProcessor.execute(it.next(), this.leftStream);
        }
        Iterator<Pair<Integer, Integer>> it2 = this.rightKeyValues.iterator();
        while (it2.hasNext()) {
            this.joinProcessor.execute(it2.next(), this.rightStream);
        }
        this.joinProcessor.finish();
    }
}
