/*
 * Decompiled with CFR 0.152.
 */
package cascading;

import cascading.PlatformTestCase;
import cascading.TestIdentityBuffer;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowDef;
import cascading.flow.FlowProps;
import cascading.operation.Aggregator;
import cascading.operation.Buffer;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.Insert;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitGenerator;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
import cascading.pipe.joiner.LeftJoin;
import cascading.pipe.joiner.MixedJoin;
import cascading.pipe.joiner.OuterJoin;
import cascading.pipe.joiner.RightJoin;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
import cascading.util.NullNotEquivalentComparator;
import data.InputData;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Test;

public class CoGroupFieldedPipesPlatformTest
extends PlatformTestCase {
    public CoGroupFieldedPipesPlatformTest() {
        super(true, 4, 1);
    }

    @Test
    public void testCross() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lhs", this.getPlatform().getTextFile(InputData.inputFileLhs));
        sources.put("rhs", this.getPlatform().getTextFile(InputData.inputFileRhs));
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cross"), SinkMode.REPLACE);
        Each pipeLower = new Each("lhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " "));
        Each pipeUpper = new Each("rhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " "));
        CoGroup cross = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"numLHS"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numRHS"}), (Joiner)new InnerJoin());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cross);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
    }

    @Test
    public void testCoGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroup"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), (Joiner)new InnerJoin(Fields.size((int)4)));
        Map<Object, Object> properties = this.getProperties();
        FlowProps.setDefaultTupleElementComparator(properties, (String)this.getPlatform().getStringComparator(false).getClass().getCanonicalName());
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupSamePipeName() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("renamedpipes"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipeLower = new Pipe("lower");
        Pipe pipeUpper = new Pipe("upper");
        pipeLower = new Pipe("same", pipeLower);
        pipeUpper = new Pipe("same", pipeUpper);
        pipeLower = new Each(pipeLower, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each(pipeUpper, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Pipe("left", pipeLower);
        pipeUpper = new Pipe("right", pipeUpper);
        CoGroup splice = new CoGroup(pipeLower, new Fields(new Comparable[]{"num"}), pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        splice = new Pipe("splice", (Pipe)splice);
        splice = new Pipe("tail", (Pipe)splice);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupWithUnknowns() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("unknown"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(Fields.UNKNOWN, " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{Integer.valueOf(0)}), (Pipe)pipeUpper, new Fields(new Comparable[]{Integer.valueOf(0)}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupFilteredBranch() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroupfilteredbranch"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter("^fobar"));
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4), (Joiner)new OuterJoin());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\tnull\tnull"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\tnull\tnull"})));
    }

    @Test
    public void testCoGroupSelf() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroupself"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testSplitCoGroupSelf() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lowerLhs", source);
        sources.put("upperLhs", source);
        sources.put("lowerRhs", source);
        sources.put("upperRhs", source);
        Tap sinkLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("splitcogroupself/lhs"), SinkMode.REPLACE);
        Tap sinkRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("splitcogroupself/rhs"), SinkMode.REPLACE);
        HashMap<String, Tap> sinks = new HashMap<String, Tap>();
        sinks.put("lhs", sinkLhs);
        sinks.put("rhs", sinkRhs);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLowerLhs = new Each(new Pipe("lowerLhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpperLhs = new Each(new Pipe("upperLhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup spliceLhs = new CoGroup("lhs", (Pipe)pipeLowerLhs, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpperLhs, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Each pipeLowerRhs = new Each(new Pipe("lowerRhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpperRhs = new Each(new Pipe("upperRhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup spliceRhs = new CoGroup("rhs", (Pipe)pipeLowerRhs, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpperRhs, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{spliceLhs, spliceRhs});
        flow.complete();
        List values = CoGroupFieldedPipesPlatformTest.asList((Flow)flow, (Tap)sinkLhs);
        CoGroupFieldedPipesPlatformTest.assertEquals((int)5, (int)values.size());
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
        values = CoGroupFieldedPipesPlatformTest.asList((Flow)flow, (Tap)sinkRhs);
        CoGroupFieldedPipesPlatformTest.assertEquals((int)5, (int)values.size());
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupAfterEvery() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Long.TYPE, String.class}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Long.TYPE, String.class}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("afterevery"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{String.class, String.class}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        pipeLower = new Every((Pipe)pipeLower, new Fields(new Comparable[]{"char"}), (Aggregator)new First(), Fields.ALL);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        pipeUpper = new Every((Pipe)pipeUpper, new Fields(new Comparable[]{"char"}), (Aggregator)new First(), Fields.ALL);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Map<Object, Object> properties = this.getPlatform().getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupAfterEveryNoDeclared() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("aftereverynodeclared"), SinkMode.REPLACE);
        RegexSplitter splitter1 = new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter1);
        pipeLower = new Each((Pipe)pipeLower, (Function)new Insert(new Fields(new Comparable[]{"one", "two", "three", "four"}), new Object[]{"one", "two", "three", "four"}), Fields.ALL);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num1"}));
        pipeLower = new Every((Pipe)pipeLower, new Fields(new Comparable[]{"char1"}), (Aggregator)new First(), Fields.ALL);
        RegexSplitter splitter2 = new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " ");
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter2);
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num2"}));
        pipeUpper = new Every((Pipe)pipeUpper, new Fields(new Comparable[]{"char2"}), (Aggregator)new First(), Fields.ALL);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num1"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupInnerSingleField() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroupinnersingle"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char"}), " "), new Fields(new Comparable[]{"num1"}));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num2", "char"}), " "), new Fields(new Comparable[]{"num2"}));
        CoGroup join = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num1"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num2"}));
        join = new Every((Pipe)join, (Aggregator)new Count());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)join);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)2, null);
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1\t1\t1"}));
        results.add(new Tuple(new Object[]{"5\t5\t2"}));
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        results.removeAll(actual);
        CoGroupFieldedPipesPlatformTest.assertEquals((int)0, (int)results.size());
    }

    @Test
    public void testCoGroupInner() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("cogroupinner", (Joiner)new InnerJoin(), results, 8, false, null);
        this.handleJoins("cogroupinner-resultgroup", (Joiner)new InnerJoin(), results, 8, true, null);
    }

    @Test
    public void testCoGroupInnerNull() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        this.handleJoins("cogroupinnernull", (Joiner)new InnerJoin(), results, 9, false, new NullNotEquivalentComparator());
        this.handleJoins("cogroupinnernull-resultgroup", (Joiner)new InnerJoin(), results, 9, true, new NullNotEquivalentComparator());
    }

    @Test
    public void testCoGroupOuter() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"3", "c1", null, null}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{"5", "e1", null, null}));
        results.add(new Tuple(new Object[]{"5", "e2", null, null}));
        results.add(new Tuple(new Object[]{"5", "e3", null, null}));
        results.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        results.add(new Tuple(new Object[]{"7", "g1", null, null}));
        results.add(new Tuple(new Object[]{"7", "g2", null, null}));
        results.add(new Tuple(new Object[]{"7", "g3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g4", null, null}));
        results.add(new Tuple(new Object[]{"7", "g5", null, null}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("cogroupouter", (Joiner)new OuterJoin(), results, 8, false, null);
        this.handleJoins("cogroupouter-resultgroup", (Joiner)new OuterJoin(), results, 8, true, null);
    }

    @Test
    public void testCoGroupOuterNull() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"3", "c1", null, null}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{"5", "e1", null, null}));
        results.add(new Tuple(new Object[]{"5", "e2", null, null}));
        results.add(new Tuple(new Object[]{"5", "e3", null, null}));
        results.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        results.add(new Tuple(new Object[]{"7", "g1", null, null}));
        results.add(new Tuple(new Object[]{"7", "g2", null, null}));
        results.add(new Tuple(new Object[]{"7", "g3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g4", null, null}));
        results.add(new Tuple(new Object[]{"7", "g5", null, null}));
        results.add(new Tuple(new Object[]{null, "h1", null, null}));
        results.add(new Tuple(new Object[]{null, null, null, "H1"}));
        this.handleJoins("cogroupouternull", (Joiner)new OuterJoin(), results, 9, false, new NullNotEquivalentComparator());
        this.handleJoins("cogroupouternull-resultgroup", (Joiner)new OuterJoin(), results, 9, true, new NullNotEquivalentComparator());
    }

    @Test
    public void testCoGroupInnerOuter() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"3", "c1", null, null}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{"5", "e1", null, null}));
        results.add(new Tuple(new Object[]{"5", "e2", null, null}));
        results.add(new Tuple(new Object[]{"5", "e3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g1", null, null}));
        results.add(new Tuple(new Object[]{"7", "g2", null, null}));
        results.add(new Tuple(new Object[]{"7", "g3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g4", null, null}));
        results.add(new Tuple(new Object[]{"7", "g5", null, null}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("cogroupinnerouter", (Joiner)new LeftJoin(), results, 8, false, null);
        this.handleJoins("cogroupinnerouter-resultgroup", (Joiner)new LeftJoin(), results, 8, true, null);
    }

    @Test
    public void testCoGroupInnerOuterNull() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"3", "c1", null, null}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{"5", "e1", null, null}));
        results.add(new Tuple(new Object[]{"5", "e2", null, null}));
        results.add(new Tuple(new Object[]{"5", "e3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g1", null, null}));
        results.add(new Tuple(new Object[]{"7", "g2", null, null}));
        results.add(new Tuple(new Object[]{"7", "g3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g4", null, null}));
        results.add(new Tuple(new Object[]{"7", "g5", null, null}));
        results.add(new Tuple(new Object[]{null, "h1", null, null}));
        this.handleJoins("cogroupinnerouternull", (Joiner)new LeftJoin(), results, 9, false, new NullNotEquivalentComparator());
        this.handleJoins("cogroupinnerouternull-resultgroup", (Joiner)new LeftJoin(), results, 9, true, new NullNotEquivalentComparator());
    }

    @Test
    public void testCoGroupOuterInner() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("cogroupouterinner", (Joiner)new RightJoin(), results, 8, false, null);
        this.handleJoins("cogroupouterinner-resultgroup", (Joiner)new RightJoin(), results, 8, true, null);
    }

    @Test
    public void testCoGroupOuterInnerNull() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        results.add(new Tuple(new Object[]{null, null, null, "H1"}));
        this.handleJoins("cogroupouterinnernull", (Joiner)new RightJoin(), results, 9, false, new NullNotEquivalentComparator());
        this.handleJoins("cogroupouterinnernull-resultgroup", (Joiner)new RightJoin(), results, 9, true, new NullNotEquivalentComparator());
    }

    private void handleJoins(String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator) throws Exception {
        results = new HashSet<Tuple>(results);
        this.getPlatform().copyFromLocal(InputData.inputFileLhsSparse);
        this.getPlatform().copyFromLocal(InputData.inputFileRhsSparse);
        Fields fields = new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{Integer.class, String.class});
        Tap sourceLower = this.getPlatform().getDelimitedFile(fields, " ", InputData.inputFileLhsSparse);
        Tap sourceUpper = this.getPlatform().getDelimitedFile(fields, " ", InputData.inputFileRhsSparse);
        HashMap sources = new HashMap();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getDelimitedFile(Fields.size((int)4, String.class), "\t", this.getOutputPath(path), SinkMode.REPLACE);
        Pipe pipeLower = new Pipe("lower");
        Pipe pipeUpper = new Pipe("upper");
        Fields declaredFields = new Fields(new Comparable[]{"num", "char", "num2", "char2"});
        Fields groupFields = new Fields(new Comparable[]{"num"});
        if (comparator != null) {
            groupFields.setComparator((Comparable)Integer.valueOf(0), (Comparator)comparator);
        }
        CoGroup splice = useResultGroupFields ? new CoGroup(pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields(new Comparable[]{"num", "num2"}), joiner) : new CoGroup(pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner);
        splice = new Every((Pipe)splice, Fields.ALL, (Buffer)new TestIdentityBuffer(new Fields(new Comparable[]{"num", "num2"}), numGroups, true), Fields.RESULTS);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)results.size());
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        results.removeAll(actual);
        CoGroupFieldedPipesPlatformTest.assertEquals((int)0, (int)results.size());
    }

    @Test
    public void testCoGroupMixed() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("loweroffset", sourceLowerOffset);
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getDelimitedFile(Fields.size((int)6, String.class), "\t", this.getOutputPath("cogroupmixed"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLowerOffset = new Each(new Pipe("loweroffset"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Pipe[] pipes = Pipe.pipes((Pipe[])new Pipe[]{pipeLowerOffset, pipeUpper, pipeLower});
        Fields[] fields = Fields.fields((Fields[])new Fields[]{new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"})});
        MixedJoin join = new MixedJoin(new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER});
        CoGroup splice = new CoGroup(pipes, fields, Fields.size((int)6), (Joiner)join);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)6);
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a", "1", "A", "1", "a"}));
        results.add(new Tuple(new Object[]{null, null, "2", "B", "2", "b"}));
        results.add(new Tuple(new Object[]{null, null, "3", "C", "3", "c"}));
        results.add(new Tuple(new Object[]{null, null, "4", "D", "4", "d"}));
        results.add(new Tuple(new Object[]{"5", "b", "5", "E", "5", "e"}));
        results.add(new Tuple(new Object[]{"5", "e", "5", "E", "5", "e"}));
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        results.removeAll(actual);
        CoGroupFieldedPipesPlatformTest.assertEquals((int)0, (int)results.size());
    }

    @Test
    public void testCoGroupDiffFields() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("difffields"), SinkMode.REPLACE);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        CoGroup cogroup = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"numA"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numB"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cogroup);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupGroupBy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroupgroupby"), SinkMode.REPLACE);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}).applyTypes(new Type[]{String.class, String.class}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}).applyTypes(new Type[]{String.class, String.class}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        CoGroup cogroup = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"numA"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numB"}));
        GroupBy groupby = new GroupBy((Pipe)cogroup, new Fields(new Comparable[]{"numA"}));
        Map<Object, Object> properties = this.getPlatform().getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)groupby);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupSamePipe() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipe"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup cogroup = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), 1, new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cogroup);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupSamePipe2() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipe2"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup cogroup = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeLower, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cogroup);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupSamePipe3() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        HashMap sources = new HashMap();
        sources.put("lower", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipe3"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lower");
        Pipe lhs = new Pipe("lhs", pipe);
        Pipe rhs = new Pipe("rhs", pipe);
        CoGroup cogroup = new CoGroup(lhs, new Fields(new Comparable[]{"num"}), rhs, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cogroup);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupAroundCoGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroupacogroup"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice1 = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice2);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tB"})));
    }

    @Test
    public void testCoGroupAroundCoGroupWithout() throws Exception {
        this.runCoGroupAroundCoGroup(null, "cogroupacogroupopt1");
    }

    @Test
    public void testCoGroupAroundCoGroupWith() throws Exception {
        this.runCoGroupAroundCoGroup(this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num"}), "\t", InputData.inputFileNums10).getScheme().getClass(), "cogroupacogroupopt2");
    }

    private void runCoGroupAroundCoGroup(Class schemeClass, String stringPath) throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileNums20);
        this.getPlatform().copyFromLocal(InputData.inputFileNums10);
        Tap source10 = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num"}), "\t", InputData.inputFileNums10);
        Tap source20 = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num"}), "\t", InputData.inputFileNums20);
        HashMap sources = new HashMap();
        sources.put("source20", source20);
        sources.put("source101", source10);
        sources.put("source102", source10);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath(stringPath), SinkMode.REPLACE);
        Pipe pipeNum20 = new Pipe("source20");
        Pipe pipeNum101 = new Pipe("source101");
        Pipe pipeNum102 = new Pipe("source102");
        CoGroup splice1 = new CoGroup(pipeNum20, new Fields(new Comparable[]{"num"}), pipeNum101, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), pipeNum102, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"}));
        splice2 = new Each((Pipe)splice2, (Function)new Identity());
        Map<Object, Object> properties = this.getPlatform().getProperties();
        if (this.getPlatform().isMapReduce()) {
            FlowConnectorProps.setIntermediateSchemeClass(properties, (Class)schemeClass);
        }
        Flow flow = this.getPlatform().getFlowConnector(properties).connect("cogroupopt", sources, sink, (Pipe)splice2);
        if (this.getPlatform().isMapReduce()) {
            CoGroupFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)10);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\t1\t1"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"10\t10\t10"})));
    }

    @Test
    public void testCoGroupDiffFieldsSameFile() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceOffsetLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("offsetLower", sourceOffsetLower);
        sources.put("lower", sourceLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samefiledifffields"), SinkMode.REPLACE);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "left"}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "right"}), " ");
        Pipe offsetLower = new Pipe("offsetLower");
        offsetLower = new Discard(offsetLower, new Fields(new Comparable[]{"offset"}));
        offsetLower = new Each(offsetLower, new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        CoGroup cogroup = new CoGroup(offsetLower, new Fields(new Comparable[]{"numA"}), (Pipe)pipeLower, new Fields(new Comparable[]{"numB"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cogroup);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List actual = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinNone() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinnone"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup((Pipe)pipeLower, Fields.NONE, (Pipe)pipeUpper, Fields.NONE, Fields.size((int)4));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)25);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t2\tB"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testMultiJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCrossX2);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileCrossX2);
        Tap innerSink = this.getPlatform().getTextFile(this.getOutputPath("inner"), SinkMode.REPLACE);
        Tap outerSink = this.getPlatform().getTextFile(this.getOutputPath("outer"), SinkMode.REPLACE);
        Tap leftSink = this.getPlatform().getTextFile(this.getOutputPath("left"), SinkMode.REPLACE);
        Tap rightSink = this.getPlatform().getTextFile(this.getOutputPath("right"), SinkMode.REPLACE);
        Pipe uniques = new Pipe("unique");
        uniques = new Each(uniques, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        uniques = new GroupBy(uniques, new Fields(new Comparable[]{"word"}));
        uniques = new Every(uniques, new Fields(new Comparable[]{"word"}), (Aggregator)new First(Fields.ARGS), Fields.REPLACE);
        Pipe fielded = new Pipe("fielded");
        fielded = new Each(fielded, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter("\\s"));
        CoGroup inner = new CoGroup("inner", fielded, new Fields(new Comparable[]{Integer.valueOf(0)}), uniques, new Fields(new Comparable[]{"word"}), (Joiner)new InnerJoin());
        CoGroup outer = new CoGroup("outer", fielded, new Fields(new Comparable[]{Integer.valueOf(0)}), uniques, new Fields(new Comparable[]{"word"}), (Joiner)new OuterJoin());
        CoGroup left = new CoGroup("left", fielded, new Fields(new Comparable[]{Integer.valueOf(0)}), uniques, new Fields(new Comparable[]{"word"}), (Joiner)new LeftJoin());
        CoGroup right = new CoGroup("right", fielded, new Fields(new Comparable[]{Integer.valueOf(0)}), uniques, new Fields(new Comparable[]{"word"}), (Joiner)new RightJoin());
        Pipe[] heads = Pipe.pipes((Pipe[])new Pipe[]{uniques, fielded});
        Map sources = Cascades.tapsMap((Pipe[])heads, (Tap[])Tap.taps((Tap[])new Tap[]{source, source}));
        Pipe[] tails = Pipe.pipes((Pipe[])new Pipe[]{inner, outer, left, right});
        Map sinks = Cascades.tapsMap((Pipe[])tails, (Tap[])Tap.taps((Tap[])new Tap[]{innerSink, outerSink, leftSink, rightSink}));
        Flow flow = this.getPlatform().getFlowConnector().connect("multi-joins", sources, sinks, tails);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(innerSink), (int)74);
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(outerSink), (int)84);
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(leftSink), (int)74);
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(rightSink), (int)84);
    }

    @Test
    public void testMultiJoinWithSplits() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCrossX2);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileCrossX2);
        Tap innerSinkLhs = this.getPlatform().getTextFile(this.getOutputPath("innerLhs"), SinkMode.REPLACE);
        Tap uniqueSinkLhs = this.getPlatform().getTextFile(this.getOutputPath("uniquesLhs"), SinkMode.REPLACE);
        Tap innerSinkRhs = this.getPlatform().getTextFile(this.getOutputPath("innerRhs"), SinkMode.REPLACE);
        Tap uniqueSinkRhs = this.getPlatform().getTextFile(this.getOutputPath("uniquesRhs"), SinkMode.REPLACE);
        Pipe incoming = new Pipe("incoming");
        Each generatorLhs = new Each(new Pipe("genLhsLhs", incoming), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        generatorLhs = new Each((Pipe)generatorLhs, (Function)new Identity());
        Each generatorRhs = new Each(new Pipe("genRhsLhs", incoming), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        Each uniques = new Each(new Pipe("uniquesLhs", (Pipe)generatorLhs), (Function)new Identity());
        uniques = new GroupBy((Pipe)uniques, new Fields(new Comparable[]{"word"}));
        uniques = new Every((Pipe)uniques, new Fields(new Comparable[]{"word"}), (Aggregator)new First(Fields.ARGS), Fields.REPLACE);
        Pipe lhs = new Pipe("lhs", (Pipe)generatorLhs);
        lhs = new Rename(lhs, new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"lhs"}));
        Pipe rhs = new Pipe("rhs", (Pipe)generatorRhs);
        rhs = new Rename(rhs, new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"rhs"}));
        CoGroup inner = new CoGroup("innerLhs", lhs, new Fields(new Comparable[]{Integer.valueOf(0)}), rhs, new Fields(new Comparable[]{Integer.valueOf(0)}), (Joiner)new InnerJoin());
        Each uniquesLhs = uniques;
        CoGroup innerLhs = inner;
        generatorLhs = new Each(new Pipe("genLhsRhs", incoming), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        generatorLhs = new Each((Pipe)generatorLhs, (Function)new Identity());
        generatorRhs = new Each(new Pipe("genRhsRhs", incoming), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        uniques = new Each(new Pipe("uniquesRhs", (Pipe)generatorLhs), (Function)new Identity());
        uniques = new GroupBy((Pipe)uniques, new Fields(new Comparable[]{"word"}));
        uniques = new Every((Pipe)uniques, new Fields(new Comparable[]{"word"}), (Aggregator)new First(Fields.ARGS), Fields.REPLACE);
        lhs = new Pipe("lhs", (Pipe)generatorLhs);
        lhs = new Rename(lhs, new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"lhs"}));
        rhs = new Pipe("rhs", (Pipe)generatorRhs);
        rhs = new Rename(rhs, new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"rhs"}));
        inner = new CoGroup("innerRhs", lhs, new Fields(new Comparable[]{Integer.valueOf(0)}), rhs, new Fields(new Comparable[]{Integer.valueOf(0)}), (Joiner)new InnerJoin());
        Each uniquesRhs = uniques;
        CoGroup innerRhs = inner;
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName("multi-joins")).addSource("incoming", source).addTailSink((Pipe)innerLhs, innerSinkLhs).addTailSink((Pipe)uniquesLhs, uniqueSinkLhs).addTailSink((Pipe)innerRhs, innerSinkRhs).addTailSink((Pipe)uniquesRhs, uniqueSinkRhs);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(innerSinkLhs), (int)3900);
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(uniqueSinkLhs), (int)15);
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(innerSinkRhs), (int)3900);
        CoGroupFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(uniqueSinkRhs), (int)15);
    }

    @Test
    public void testSameSourceGroupSplitCoGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Long.TYPE, String.class}), InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath(), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{String.class, String.class}), " ");
        Each sourcePipe = new Each(new Pipe("source"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        sourcePipe = new GroupBy((Pipe)sourcePipe, new Fields(new Comparable[]{"num"}));
        sourcePipe = new Every((Pipe)sourcePipe, new Fields(new Comparable[]{"char"}), (Aggregator)new First(new Fields((Comparable)((Object)"first"), String.class)), Fields.ALL);
        Retain lhsPipe = new Retain(new Pipe("lhs", (Pipe)sourcePipe), Fields.ALL);
        Retain rhsPipe = new Retain(new Pipe("rhs", (Pipe)sourcePipe), Fields.ALL);
        CoGroup splice = new CoGroup((Pipe)lhsPipe, new Fields(new Comparable[]{"num"}), (Pipe)rhsPipe, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Map<Object, Object> properties = this.getPlatform().getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(source, sink, (Pipe)splice);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = CoGroupFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        CoGroupFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testGroupSplitToCoGroupsTriangle() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        HashMap sources = new HashMap();
        sources.put("lower", this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"numLower", "charLower"}), " ", InputData.inputFileLower));
        sources.put("upper", this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"numUpper", "charUpper"}), " ", InputData.inputFileUpper));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("sink"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"numLower", "charLower"}), (Function)new Identity());
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"numUpper", "charUpper"}), (Function)new Identity());
        GroupBy upperGroupBy = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"numUpper"}));
        Every grpByEvery = new Every((Pipe)upperGroupBy, (Aggregator)new Count(), new Fields(new Comparable[]{"numUpper", "count"}));
        Each grpByEveryE1 = new Each((Pipe)grpByEvery, new Fields(new Comparable[]{"numUpper", "count"}), (Function)new Identity());
        Each grpByEveryE2 = new Each((Pipe)grpByEvery, new Fields(new Comparable[]{"numUpper", "count"}), (Function)new Identity(new Fields(new Comparable[]{"numUpperUpper", "countUpper"})));
        CoGroup cogroup = new CoGroup("cogroup", (Pipe)pipeLower, new Fields(new Comparable[]{"numLower"}), (Pipe)grpByEveryE1, new Fields(new Comparable[]{"numUpper"}));
        Each cogroupEach = new Each((Pipe)cogroup, new Fields(new Comparable[]{"numLower", "charLower", "numUpper", "count"}), (Function)new Identity());
        CoGroup cogroupNested = new CoGroup((Pipe)cogroupEach, new Fields(new Comparable[]{"numLower"}), (Pipe)grpByEveryE2, new Fields(new Comparable[]{"numUpperUpper"}));
        Each cogrpNestedEach = new Each((Pipe)cogroupNested, Fields.ALL, (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cogrpNestedEach);
        flow.complete();
        CoGroupFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
    }
}

