package org.apache.storm.streams.processors;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.storm.streams.Pair;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/storm/streams/processors/CoGroupByKeyProcessorTest.class */
public class CoGroupByKeyProcessorTest {
    private CoGroupByKeyProcessor<Integer, Integer, Integer> coGroupByKeyProcessor;
    private String firstStream = "first";
    private String secondStream = "second";
    private List<Pair<Integer, Pair<List<Integer>, List<Integer>>>> res = new ArrayList();
    private ProcessorContext context = new ProcessorContext() { // from class: org.apache.storm.streams.processors.CoGroupByKeyProcessorTest.1
        public <T> void forward(T t) {
            CoGroupByKeyProcessorTest.this.res.add((Pair) t);
        }

        public <T> void forward(T t, String str) {
        }

        public boolean isWindowed() {
            return true;
        }

        public Set<String> getWindowedParentStreams() {
            return null;
        }
    };
    private List<Pair<Integer, Integer>> firstKeyValues = Arrays.asList(Pair.of(2, 4), Pair.of(5, 25), Pair.of(7, 49), Pair.of(7, 87));
    private List<Pair<Integer, Integer>> secondKeyValues = Arrays.asList(Pair.of(1, 1), Pair.of(2, 8), Pair.of(5, 125), Pair.of(5, 50), Pair.of(6, 216));

    @Test
    public void testCoGroupByKey() throws Exception {
        this.coGroupByKeyProcessor = new CoGroupByKeyProcessor<>(this.firstStream, this.secondStream);
        processValues();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(25);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(125);
        arrayList3.add(50);
        arrayList.add(Pair.of(5, Pair.of(arrayList2, arrayList3)));
        Assert.assertEquals(arrayList.get(0), this.res.get(1));
        arrayList2.clear();
        arrayList3.clear();
        arrayList2.add(49);
        arrayList2.add(87);
        arrayList.clear();
        arrayList.add(Pair.of(7, Pair.of(arrayList2, arrayList3)));
        Assert.assertEquals(arrayList.get(0), this.res.get(2));
    }

    private void processValues() {
        this.res.clear();
        this.coGroupByKeyProcessor.init(this.context);
        Iterator<Pair<Integer, Integer>> it = this.firstKeyValues.iterator();
        while (it.hasNext()) {
            this.coGroupByKeyProcessor.execute(it.next(), this.firstStream);
        }
        Iterator<Pair<Integer, Integer>> it2 = this.secondKeyValues.iterator();
        while (it2.hasNext()) {
            this.coGroupByKeyProcessor.execute(it2.next(), this.secondStream);
        }
        this.coGroupByKeyProcessor.finish();
    }
}
