/*
 * Decompiled with CFR 0.152.
 */
package cascading;

import cascading.PlatformTestCase;
import cascading.TestIdentityBuffer;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowElement;
import cascading.flow.FlowStep;
import cascading.operation.Aggregator;
import cascading.operation.Buffer;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Checkpoint;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Rename;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
import cascading.pipe.joiner.LeftJoin;
import cascading.pipe.joiner.MixedJoin;
import cascading.pipe.joiner.OuterJoin;
import cascading.pipe.joiner.RightJoin;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Test;

public class JoinFieldedPipesPlatformTest
extends PlatformTestCase {
    public JoinFieldedPipesPlatformTest() {
        super(true, 4, 1);
    }

    @Test
    public void testCross() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lhs", this.getPlatform().getTextFile(InputData.inputFileLhs));
        sources.put("rhs", this.getPlatform().getTextFile(InputData.inputFileRhs));
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cross"), SinkMode.REPLACE);
        Each pipeLower = new Each("lhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " "));
        Each pipeUpper = new Each("rhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " "));
        HashJoin cross = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numLHS"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numRHS"}), (Joiner)new InnerJoin());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)cross);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
    }

    @Test
    public void testJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinSamePipeName() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("renamedpipes"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipeLower = new Pipe("lower");
        Pipe pipeUpper = new Pipe("upper");
        pipeLower = new Pipe("same", pipeLower);
        pipeUpper = new Pipe("same", pipeUpper);
        pipeLower = new Each(pipeLower, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each(pipeUpper, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Pipe("left", pipeLower);
        pipeUpper = new Pipe("right", pipeUpper);
        HashJoin splice = new HashJoin(pipeLower, new Fields(new Comparable[]{"num"}), pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        splice = new Pipe("splice", (Pipe)splice);
        splice = new Pipe("tail", (Pipe)splice);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinWithUnknowns() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("unknown"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(Fields.UNKNOWN, " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{Integer.valueOf(0)}), (Pipe)pipeUpper, new Fields(new Comparable[]{Integer.valueOf(0)}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinFilteredBranch() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinfilteredbranch"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter("^fobar"));
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4), (Joiner)new OuterJoin());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\tnull\tnull"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\tnull\tnull"})));
    }

    @Test
    public void testJoinSelf() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap sourceLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lhs", sourceLhs);
        sources.put("rhs", sourceRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinself"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testSameSourceJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        HashMap sources = new HashMap();
        sources.put("lhs", source);
        sources.put("rhs", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath(), SinkMode.REPLACE);
        Pipe pipeLower = new Pipe("lhs");
        Pipe pipeUpper = new Pipe("rhs");
        HashJoin splice = new HashJoin(pipeLower, new Fields(new Comparable[]{"num"}), pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinAfterEvery() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("afterevery"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        pipeLower = new Every((Pipe)pipeLower, new Fields(new Comparable[]{"char"}), (Aggregator)new First(), Fields.ALL);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        pipeUpper = new Every((Pipe)pipeUpper, new Fields(new Comparable[]{"char"}), (Aggregator)new First(), Fields.ALL);
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinInnerSingleField() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joininnersingle"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char"}), " "), new Fields(new Comparable[]{"num1"}));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num2", "char"}), " "), new Fields(new Comparable[]{"num2"}));
        HashJoin join = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num1"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)join);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)3, null);
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1\t1"}));
        results.add(new Tuple(new Object[]{"5\t5"}));
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        results.removeAll(actual);
        JoinFieldedPipesPlatformTest.assertEquals((int)0, (int)results.size());
    }

    @Test
    public void testJoinInner() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("joininner", (Joiner)new InnerJoin(), results);
    }

    @Test
    public void testJoinOuter() throws Exception {
        if (this.getPlatform().isMapReduce() && this.getPlatform().isUseCluster()) {
            return;
        }
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"3", "c1", null, null}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{"5", "e1", null, null}));
        results.add(new Tuple(new Object[]{"5", "e2", null, null}));
        results.add(new Tuple(new Object[]{"5", "e3", null, null}));
        results.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        results.add(new Tuple(new Object[]{"7", "g1", null, null}));
        results.add(new Tuple(new Object[]{"7", "g2", null, null}));
        results.add(new Tuple(new Object[]{"7", "g3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g4", null, null}));
        results.add(new Tuple(new Object[]{"7", "g5", null, null}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("joinouter", (Joiner)new OuterJoin(), results);
    }

    @Test
    public void testJoinInnerOuter() throws Exception {
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"3", "c1", null, null}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{"5", "e1", null, null}));
        results.add(new Tuple(new Object[]{"5", "e2", null, null}));
        results.add(new Tuple(new Object[]{"5", "e3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g1", null, null}));
        results.add(new Tuple(new Object[]{"7", "g2", null, null}));
        results.add(new Tuple(new Object[]{"7", "g3", null, null}));
        results.add(new Tuple(new Object[]{"7", "g4", null, null}));
        results.add(new Tuple(new Object[]{"7", "g5", null, null}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("joininnerouter", (Joiner)new LeftJoin(), results);
    }

    @Test
    public void testJoinOuterInner() throws Exception {
        if (this.getPlatform().isMapReduce() && this.getPlatform().isUseCluster()) {
            return;
        }
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        results.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        results.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        results.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        results.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        results.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        results.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        this.handleJoins("joinouterinner", (Joiner)new RightJoin(), results);
    }

    private void handleJoins(String path, Joiner joiner, Set<Tuple> results) throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhsSparse);
        this.getPlatform().copyFromLocal(InputData.inputFileRhsSparse);
        Fields fields = new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{Integer.class, String.class});
        Tap sourceLower = this.getPlatform().getDelimitedFile(fields, " ", InputData.inputFileLhsSparse);
        Tap sourceUpper = this.getPlatform().getDelimitedFile(fields, " ", InputData.inputFileRhsSparse);
        HashMap sources = new HashMap();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getDelimitedFile(Fields.size((int)4, String.class), "\t", this.getOutputPath(path), SinkMode.REPLACE);
        Pipe pipeLower = new Pipe("lower");
        Pipe pipeUpper = new Pipe("upper");
        Fields declaredFields = new Fields(new Comparable[]{"num", "char", "num2", "char2"});
        Fields groupingFields = new Fields(new Comparable[]{"num"});
        HashJoin splice = new HashJoin(pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner);
        splice = new Each((Pipe)splice, Fields.ALL, (Function)new Identity(), Fields.RESULTS);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)results.size());
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        results.removeAll(actual);
        JoinFieldedPipesPlatformTest.assertEquals((int)0, (int)results.size());
    }

    @Test
    public void testJoinMixed() throws Exception {
        if (this.getPlatform().isMapReduce() && this.getPlatform().isUseCluster()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("loweroffset", sourceLowerOffset);
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinmixed"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLowerOffset = new Each(new Pipe("loweroffset"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Pipe[] pipes = Pipe.pipes((Pipe[])new Pipe[]{pipeLowerOffset, pipeUpper, pipeLower});
        Fields[] fields = Fields.fields((Fields[])new Fields[]{new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"})});
        MixedJoin join = new MixedJoin(new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER});
        HashJoin splice = new HashJoin(pipes, fields, Fields.size((int)6), (Joiner)join);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)6);
        HashSet<Tuple> results = new HashSet<Tuple>();
        results.add(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta"}));
        results.add(new Tuple(new Object[]{"null\tnull\t2\tB\t2\tb"}));
        results.add(new Tuple(new Object[]{"null\tnull\t3\tC\t3\tc"}));
        results.add(new Tuple(new Object[]{"null\tnull\t4\tD\t4\td"}));
        results.add(new Tuple(new Object[]{"5\tb\t5\tE\t5\te"}));
        results.add(new Tuple(new Object[]{"5\te\t5\tE\t5\te"}));
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        results.removeAll(actual);
        JoinFieldedPipesPlatformTest.assertEquals((int)0, (int)results.size());
    }

    @Test
    public void testJoinDiffFields() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("difffields"), SinkMode.REPLACE);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        HashJoin pipe = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numA"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numB"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)pipe);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinGroupBy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joingroupby"), SinkMode.REPLACE);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        HashJoin pipe = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numA"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numB"}));
        GroupBy groupby = new GroupBy((Pipe)pipe, new Fields(new Comparable[]{"numA"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)groupby);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinSamePipe() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipe"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin pipe = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), 1, new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)pipe);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinSamePipe2() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipe2"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin join = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeLower, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)join);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinSamePipe3() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        HashMap sources = new HashMap();
        sources.put("lower", source);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipe3"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lower");
        Pipe lhs = new Pipe("lhs", pipe);
        Pipe rhs = new Pipe("rhs", pipe);
        HashJoin join = new HashJoin(lhs, new Fields(new Comparable[]{"num"}), rhs, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)join);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinAroundJoinRightMost() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinaroundjoinrightmost"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice1 = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        HashJoin splice2 = new HashJoin((Pipe)splice1, new Fields(new Comparable[]{"num1"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice2);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tB"})));
    }

    @Test
    public void testJoinAroundJoinLeftMost() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinaroundjoinleftmost"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice1 = new HashJoin((Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        HashJoin splice2 = new HashJoin((Pipe)splice1, new Fields(new Comparable[]{"num1"}), (Pipe)pipeLower, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice2);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\tA\t1\tA\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tB\t2\tB\t2\tb"})));
    }

    @Test
    public void testJoinAroundJoinRightMostSwapped() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinaroundjoinswapped"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice1 = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        HashJoin splice2 = new HashJoin((Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), (Pipe)splice1, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice2);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\tA\t1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tB\t2\tb\t2\tB"})));
    }

    @Test
    public void testJoinGroupByJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceJoined = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileJoined);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("joined", sourceJoined);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joingroupbyjoin"), SinkMode.REPLACE);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        RegexSplitter splitterJoined = new RegexSplitter(new Fields(new Comparable[]{"numC", "lowerC", "upperC"}), "\t");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        Each pipeJoined = new Each(new Pipe("joined"), new Fields(new Comparable[]{"line"}), (Function)splitterJoined);
        HashJoin pipe = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numA"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numB"}));
        pipe = new GroupBy((Pipe)pipe, new Fields(new Comparable[]{"numA"}));
        pipe = new HashJoin((Pipe)pipe, new Fields(new Comparable[]{"numA"}), (Pipe)pipeJoined, new Fields(new Comparable[]{"numC"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)pipe);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tb\tB"})));
    }

    @Test
    public void testJoinSameSourceIntoJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsamesourceintojoin"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice1 = new HashJoin((Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        HashJoin splice2 = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)splice1, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice2);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)3, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tB"})));
    }

    @Test
    public void testJoinSameSourceIntoJoinSimple() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsamesourceintojoinsimple"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice1 = new HashJoin((Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice1);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\tA\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tB\t2\tB"})));
    }

    @Test
    public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsamesourceovergroupbyintojoinsimple"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeUpper1 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper1 = new GroupBy((Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}));
        pipeUpper2 = new GroupBy((Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}));
        HashJoin splice1 = new HashJoin((Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice1);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)3, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\tA\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tB\t2\tB"})));
    }

    @Test
    public void testJoinsIntoGroupBy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("lhs", sourceLhs);
        sources.put("rhs", sourceRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsintogroupby"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLhs = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeRhs = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin upperLower = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        upperLower = new Each((Pipe)upperLower, (Function)new Identity());
        HashJoin lhsRhs = new HashJoin((Pipe)pipeLhs, new Fields(new Comparable[]{"num"}), (Pipe)pipeRhs, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        lhsRhs = new Each((Pipe)lhsRhs, (Function)new Identity());
        GroupBy grouped = new GroupBy("merging", Pipe.pipes((Pipe[])new Pipe[]{upperLower, lhsRhs}), new Fields(new Comparable[]{"num1"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)grouped);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)42, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"5\te\t5\tE"})));
    }

    @Test
    public void testJoinSamePipeAroundGroupBy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("samepipearoundgroupby"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each lhsPipe = new Each(new Pipe("lhs", (Pipe)pipeLower), (Function)new Identity());
        Each rhsPipe = new Each(new Pipe("rhs", (Pipe)pipeLower), (Function)new Identity());
        rhsPipe = new GroupBy((Pipe)rhsPipe, new Fields(new Comparable[]{"num"}));
        rhsPipe = new Each((Pipe)rhsPipe, (Function)new Identity());
        HashJoin pipe = new HashJoin((Pipe)lhsPipe, new Fields(new Comparable[]{"num"}), (Pipe)rhsPipe, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)pipe);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinsIntoCoGroupLhs() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("lhs", sourceLhs);
        sources.put("rhs", sourceRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsintocogrouplhs"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLhs = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeRhs = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin upperLower = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"}));
        upperLower = new Each((Pipe)upperLower, (Function)new Identity());
        HashJoin lhsUpperLower = new HashJoin((Pipe)pipeLhs, new Fields(new Comparable[]{"num"}), (Pipe)upperLower, new Fields(new Comparable[]{"numUpperLower"}), new Fields(new Comparable[]{"numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"}));
        lhsUpperLower = new Each((Pipe)lhsUpperLower, (Function)new Identity());
        CoGroup grouped = new CoGroup("cogrouping", (Pipe)lhsUpperLower, new Fields(new Comparable[]{"numLhs"}), (Pipe)pipeRhs, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)grouped);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\tA\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"5\ta\t5\te\t5\tE\t5\tA"})));
    }

    @Test
    public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("lhs", sourceLhs);
        sources.put("rhs", sourceRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsintocogrouplhsswappedjoin"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLhs = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeRhs = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin upperLower = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"}));
        upperLower = new Each((Pipe)upperLower, (Function)new Identity());
        HashJoin lhsUpperLower = new HashJoin((Pipe)upperLower, new Fields(new Comparable[]{"numUpperLower"}), (Pipe)pipeLhs, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs"}));
        lhsUpperLower = new Each((Pipe)lhsUpperLower, (Function)new Identity());
        CoGroup grouped = new CoGroup("cogrouping", (Pipe)lhsUpperLower, new Fields(new Comparable[]{"numLhs"}), (Pipe)pipeRhs, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)grouped);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"5\te\t5\tE\t5\te\t5\tE"})));
    }

    @Test
    public void testJoinsIntoCoGroupRhs() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("lhs", sourceLhs);
        sources.put("rhs", sourceRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsintocogrouprhs"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLhs = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeRhs = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin upperLower = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"}));
        upperLower = new Each((Pipe)upperLower, (Function)new Identity());
        HashJoin lhsUpperLower = new HashJoin((Pipe)pipeLhs, new Fields(new Comparable[]{"num"}), (Pipe)upperLower, new Fields(new Comparable[]{"numUpperLower"}), new Fields(new Comparable[]{"numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"}));
        lhsUpperLower = new Each((Pipe)lhsUpperLower, (Function)new Identity());
        CoGroup grouped = new CoGroup("cogrouping", (Pipe)pipeRhs, new Fields(new Comparable[]{"num"}), (Pipe)lhsUpperLower, new Fields(new Comparable[]{"numLhs"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)grouped);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\tA\t1\ta\t1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"5\tE\t5\te\t5\te\t5\tE"})));
    }

    @Test
    public void testJoinsIntoCoGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceLhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("lhs", sourceLhs);
        sources.put("rhs", sourceRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinsintocogroup"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeLhs = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeRhs = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin upperLower = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2"}));
        upperLower = new Each((Pipe)upperLower, (Function)new Identity());
        HashJoin lhsRhs = new HashJoin((Pipe)pipeLhs, new Fields(new Comparable[]{"num"}), (Pipe)pipeRhs, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2"}));
        lhsRhs = new Each((Pipe)lhsRhs, (Function)new Identity());
        CoGroup grouped = new CoGroup("cogrouping", (Pipe)upperLower, new Fields(new Comparable[]{"numUpperLower1"}), (Pipe)lhsRhs, new Fields(new Comparable[]{"numLhsRhs1"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)grouped);
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List actual = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)actual.contains(new Tuple(new Object[]{"5\te\t5\tE\t5\te\t5\tE"})));
    }

    @Test
    public void testJoinWithHasher() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinhasher"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Function)new ExpressionFunction(Fields.ARGS, "Integer.parseInt( num )", String.class), Fields.REPLACE);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Fields num = new Fields(new Comparable[]{"num"});
        num.setComparator((Comparable)((Object)"num"), (Comparator)new AllComparator());
        HashJoin splice = new HashJoin((Pipe)pipeLower, num, (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinNone() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("joinnone"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice = new HashJoin((Pipe)pipeLower, Fields.NONE, (Pipe)pipeUpper, Fields.NONE, Fields.size((int)4));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)25);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t2\tB"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testGroupBySplitJoins() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap sourceJoined = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileJoined);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("joined", sourceJoined);
        Tap lhsSink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("lhs"), SinkMode.REPLACE);
        Tap rhsSink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("rhs"), SinkMode.REPLACE);
        HashMap<String, Tap> sinks = new HashMap<String, Tap>();
        sinks.put("lhs", lhsSink);
        sinks.put("rhs", rhsSink);
        RegexSplitter splitterLower = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter splitterUpper = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        RegexSplitter splitterJoined = new RegexSplitter(new Fields(new Comparable[]{"numC", "lowerC", "upperC"}), "\t");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitterLower);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitterUpper);
        Each pipeJoined = new Each(new Pipe("joined"), new Fields(new Comparable[]{"line"}), (Function)splitterJoined);
        GroupBy pipe = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"numA"}));
        pipe = new Every((Pipe)pipe, Fields.ALL, (Buffer)new TestIdentityBuffer(new Fields(new Comparable[]{"numA"}), 5, false), Fields.RESULTS);
        Each lhsPipe = new Each((Pipe)pipe, (Function)new Identity());
        lhsPipe = new HashJoin("lhs", (Pipe)lhsPipe, new Fields(new Comparable[]{"numA"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numB"}));
        Each rhsPipe = new Each((Pipe)pipe, (Function)new Identity());
        rhsPipe = new HashJoin("rhs", (Pipe)rhsPipe, new Fields(new Comparable[]{"numA"}), (Pipe)pipeJoined, new Fields(new Comparable[]{"numC"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{lhsPipe, rhsPipe});
        if (this.getPlatform().isMapReduce()) {
            JoinFieldedPipesPlatformTest.assertEquals((String)"wrong number of steps", (int)3, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openSink("lhs"), (int)5, null);
        JoinFieldedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openSink("rhs"), (int)5, null);
        List lhsActual = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)lhsSink);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)lhsActual.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)lhsActual.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
        List rhsActual = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)rhsSink);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)rhsActual.contains(new Tuple(new Object[]{"1\ta\t1\ta\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)rhsActual.contains(new Tuple(new Object[]{"2\tb\t2\tb\tB"})));
    }

    @Test
    public void testJoinMergeGroupBy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileNums10);
        this.getPlatform().copyFromLocal(InputData.inputFileNums20);
        Tap lhsTap = this.getPlatform().getTextFile(new Fields(new Comparable[]{"id"}), InputData.inputFileNums10);
        Tap rhsTap = this.getPlatform().getTextFile(new Fields(new Comparable[]{"id2"}), InputData.inputFileNums20);
        Pipe lhs = new Pipe("lhs");
        Pipe rhs = new Pipe("rhs");
        HashJoin joined = new HashJoin(lhs, new Fields(new Comparable[]{"id"}), rhs, new Fields(new Comparable[]{"id2"}));
        Each pruned = new Each((Pipe)joined, new Fields(new Comparable[]{"id2"}), (Function)new Identity(), Fields.RESULTS);
        Merge merged = new Merge(new Pipe[]{pruned, rhs});
        GroupBy grouped = new GroupBy((Pipe)merged, new Fields(new Comparable[]{"id2"}));
        Count count = new Count(new Fields(new Comparable[]{"count"}));
        Every counted = new Every((Pipe)grouped, (Aggregator)count);
        String testJoinMerge = "testJoinMergeGroupBy/" + (joined instanceof CoGroup ? "cogroup" : "hashjoin");
        Tap sink = this.getPlatform().getDelimitedFile(Fields.ALL, true, "\t", null, this.getOutputPath(testJoinMerge), SinkMode.REPLACE);
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName("join-merge")).addSource(rhs, rhsTap).addSource(lhs, lhsTap).addTailSink((Pipe)counted, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)20);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        ArrayList<Tuple> expected = new ArrayList<Tuple>();
        expected.add(new Tuple(new Object[]{"1", "2"}));
        expected.add(new Tuple(new Object[]{"10", "2"}));
        expected.add(new Tuple(new Object[]{"11", "1"}));
        expected.add(new Tuple(new Object[]{"12", "1"}));
        expected.add(new Tuple(new Object[]{"13", "1"}));
        expected.add(new Tuple(new Object[]{"14", "1"}));
        expected.add(new Tuple(new Object[]{"15", "1"}));
        expected.add(new Tuple(new Object[]{"16", "1"}));
        expected.add(new Tuple(new Object[]{"17", "1"}));
        expected.add(new Tuple(new Object[]{"18", "1"}));
        expected.add(new Tuple(new Object[]{"19", "1"}));
        expected.add(new Tuple(new Object[]{"2", "2"}));
        expected.add(new Tuple(new Object[]{"20", "1"}));
        expected.add(new Tuple(new Object[]{"3", "2"}));
        expected.add(new Tuple(new Object[]{"4", "2"}));
        expected.add(new Tuple(new Object[]{"5", "2"}));
        expected.add(new Tuple(new Object[]{"6", "2"}));
        expected.add(new Tuple(new Object[]{"7", "2"}));
        expected.add(new Tuple(new Object[]{"8", "2"}));
        expected.add(new Tuple(new Object[]{"9", "2"}));
        Collections.sort(values);
        Collections.sort(expected);
        JoinFieldedPipesPlatformTest.assertEquals(expected, (Object)values);
    }

    @Test
    public void testJoinSplit() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        FlowDef flowDef = FlowDef.flowDef().addSource("lhs", this.getPlatform().getTextFile(InputData.inputFileLhs)).addSource("rhs", this.getPlatform().getTextFile(InputData.inputFileRhs)).addSink("lhsSink", this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("lhs"), SinkMode.REPLACE)).addSink("rhsSink", this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("rhs"), SinkMode.REPLACE));
        Each pipeLower = new Each("lhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " "));
        Each pipeUpper = new Each("rhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " "));
        HashJoin join = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numLHS"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numRHS"}), (Joiner)new InnerJoin());
        Each pipeLhs = new Each(new Pipe("lhsSink", (Pipe)join), (Function)new Identity());
        Each pipeRhs = new Each(new Pipe("rhsSink", (Pipe)join), (Function)new Identity());
        flowDef.addTail((Pipe)pipeLhs).addTail((Pipe)pipeRhs);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)37, null);
        List values = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)((Tap)flowDef.getSinks().get("lhsSink")));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
        values = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)((Tap)flowDef.getSinks().get("rhsSink")));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
    }

    @Test
    public void testSameSourceJoinSplitIntoJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        FlowDef flowDef = FlowDef.flowDef().addSource("lhs", this.getPlatform().getTextFile(InputData.inputFileLhs)).addSource("rhs", this.getPlatform().getTextFile(InputData.inputFileLhs)).addSource("joinSecond", this.getPlatform().getTextFile(InputData.inputFileRhs)).addSink("lhsSink", this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("lhs"), SinkMode.REPLACE)).addSink("rhsSink", this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("rhs"), SinkMode.REPLACE));
        Each pipeLower = new Each("lhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " "));
        Each pipeUpper = new Each("rhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " "));
        HashJoin joinFirst = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numLHS"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numRHS"}), (Joiner)new InnerJoin());
        Each pipeLhs = new Each(new Pipe("lhsSink", (Pipe)joinFirst), (Function)new Identity());
        Each joinSecond = new Each("joinSecond", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHSSecond", "charRHSSecond"}), " "));
        joinSecond = new HashJoin((Pipe)joinFirst, new Fields(new Comparable[]{"numLHS"}), (Pipe)joinSecond, new Fields(new Comparable[]{"numRHSSecond"}));
        Each pipeRhs = new Each(new Pipe("rhsSink", (Pipe)joinSecond), (Function)new Identity());
        flowDef.addTail((Pipe)pipeLhs).addTail((Pipe)pipeRhs);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        List values = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)((Tap)flowDef.getSinks().get("lhsSink")));
        JoinFieldedPipesPlatformTest.assertEquals((int)37, (int)values.size());
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tb"})));
        values = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)((Tap)flowDef.getSinks().get("rhsSink")));
        JoinFieldedPipesPlatformTest.assertEquals((int)109, (int)values.size());
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tb\t1\tB"})));
    }

    @Test
    public void testJoinSplitBeforeJoin() throws Exception {
        HashJoin hashJoin;
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        FlowDef flowDef = FlowDef.flowDef().addSource("lhs", this.getPlatform().getTextFile(InputData.inputFileLhs)).addSource("rhs", this.getPlatform().getTextFile(InputData.inputFileRhs)).addSource("joinSecond", this.getPlatform().getTextFile(InputData.inputFileRhs)).addSink("lhsSink", this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("lhs"), SinkMode.REPLACE)).addSink("rhsSink", this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("rhs"), SinkMode.REPLACE));
        Each pipeLower = new Each("lhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " "));
        Each pipeUpper = new Each("rhs", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " "));
        pipeUpper = new Checkpoint((Pipe)pipeUpper);
        HashJoin joinFirst = hashJoin = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"numLHS"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"numRHS"}), (Joiner)new InnerJoin());
        joinFirst = new Each((Pipe)joinFirst, (Function)new Identity());
        Each pipeLhs = new Each(new Pipe("lhsSink", (Pipe)joinFirst), (Function)new Identity());
        pipeLhs = new GroupBy((Pipe)pipeLhs, new Fields(new Comparable[]{"numLHS"}));
        joinFirst = new Each(new Pipe("lhsSplit", (Pipe)joinFirst), (Function)new Identity());
        Each joinSecond = new Each("joinSecond", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"numRHSSecond", "charRHSSecond"}), " "));
        joinSecond = new CoGroup((Pipe)joinFirst, new Fields(new Comparable[]{"numLHS"}), (Pipe)joinSecond, new Fields(new Comparable[]{"numRHSSecond"}));
        Each pipeRhs = new Each(new Pipe("rhsSink", (Pipe)joinSecond), (Function)new Identity());
        flowDef.addTail((Pipe)pipeLhs).addTail((Pipe)pipeRhs);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        if (this.getPlatform().isDAG()) {
            FlowStep flowStep = (FlowStep)flow.getFlowSteps().get(0);
            List elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs((FlowElement)hashJoin);
            JoinFieldedPipesPlatformTest.assertEquals((int)1, (int)elementGraphs.size());
        }
        flow.complete();
        List values = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)((Tap)flowDef.getSinks().get("lhsSink")));
        JoinFieldedPipesPlatformTest.assertEquals((int)37, (int)values.size());
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
        values = JoinFieldedPipesPlatformTest.asList((Flow)flow, (Tap)((Tap)flowDef.getSinks().get("rhsSink")));
        JoinFieldedPipesPlatformTest.assertEquals((int)109, (int)values.size());
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tB\t1\tB"})));
    }

    @Test
    public void testGroupBySplitGroupByJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("sink"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipeFirst = new Pipe("first");
        pipeFirst = new Each(pipeFirst, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeFirst = new GroupBy(pipeFirst, new Fields(new Comparable[]{"num"}));
        pipeFirst = new Every(pipeFirst, new Fields(new Comparable[]{"char"}), (Aggregator)new First(new Fields(new Comparable[]{"firstFirst"})), Fields.ALL);
        Pipe pipeSecond = new Pipe("second", pipeFirst);
        pipeSecond = new Each(pipeSecond, (Function)new Identity());
        pipeSecond = new GroupBy(pipeSecond, new Fields(new Comparable[]{"num"}));
        pipeSecond = new Every(pipeSecond, new Fields(new Comparable[]{"firstFirst"}), (Aggregator)new First(new Fields(new Comparable[]{"secondFirst"})), Fields.ALL);
        pipeSecond = new GroupBy(pipeSecond, new Fields(new Comparable[]{"num"}));
        pipeSecond = new Every(pipeSecond, new Fields(new Comparable[]{"secondFirst"}), (Aggregator)new First(new Fields(new Comparable[]{"thirdFirst"})), Fields.ALL);
        HashJoin splice = new HashJoin(pipeFirst, new Fields(new Comparable[]{"num"}), pipeSecond, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"3\tc\t3\tc"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"4\td\t4\td"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"5\te\t5\te"})));
    }

    @Test
    public void testGroupBySplitSplitGroupByJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("sink"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipeFirst = new Pipe("first");
        pipeFirst = new Each(pipeFirst, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeFirst = new GroupBy(pipeFirst, new Fields(new Comparable[]{"num"}));
        pipeFirst = new Every(pipeFirst, new Fields(new Comparable[]{"char"}), (Aggregator)new First(new Fields(new Comparable[]{"firstFirst"})), Fields.ALL);
        Pipe pipeSecond = new Pipe("second", pipeFirst);
        pipeSecond = new Each(pipeSecond, (Function)new Identity());
        pipeSecond = new GroupBy(pipeSecond, new Fields(new Comparable[]{"num"}));
        pipeSecond = new Every(pipeSecond, new Fields(new Comparable[]{"firstFirst"}), (Aggregator)new First(new Fields(new Comparable[]{"secondFirst"})), Fields.ALL);
        HashJoin splice = new HashJoin(pipeFirst, new Fields(new Comparable[]{"num"}), pipeSecond, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        splice = new HashJoin((Pipe)splice, new Fields(new Comparable[]{Integer.valueOf(0)}), pipeSecond, new Fields(new Comparable[]{"num"}), Fields.size((int)6));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb\t2\tb"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"3\tc\t3\tc\t3\tc"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"4\td\t4\td\t4\td"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"5\te\t5\te\t5\te"})));
    }

    @Test
    public void testGroupBySplitAroundSplitGroupByJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("sink"), SinkMode.REPLACE);
        Tap sink2 = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("sink2"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipeInit = new Pipe("init");
        Pipe pipeFirst = new Pipe("first", pipeInit);
        pipeFirst = new Each(pipeFirst, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeFirst = new GroupBy(pipeFirst, new Fields(new Comparable[]{"num"}));
        pipeFirst = new Every(pipeFirst, new Fields(new Comparable[]{"char"}), (Aggregator)new First(new Fields(new Comparable[]{"firstFirst"})), Fields.ALL);
        Pipe sink2Pipe = new Pipe("sink2", pipeFirst);
        Pipe pipeSecond = new Pipe("second", pipeInit);
        pipeSecond = new Each(pipeSecond, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeSecond = new GroupBy(pipeSecond, new Fields(new Comparable[]{"num"}));
        pipeSecond = new Every(pipeSecond, new Fields(new Comparable[]{"char"}), (Aggregator)new First(new Fields(new Comparable[]{"secondFirst"})), Fields.ALL);
        HashJoin splice = new HashJoin(pipeSecond, new Fields(new Comparable[]{"num"}), pipeFirst, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Pipe pipeThird = new Pipe("third", pipeSecond);
        pipeThird = new Each(pipeThird, (Function)new Identity());
        pipeThird = new GroupBy(pipeThird, new Fields(new Comparable[]{"num"}));
        pipeThird = new Every(pipeThird, new Fields(new Comparable[]{"secondFirst"}), (Aggregator)new First(new Fields(new Comparable[]{"thirdFirst"})), Fields.ALL);
        splice = new HashJoin((Pipe)splice, new Fields(new Comparable[]{Integer.valueOf(0)}), pipeThird, new Fields(new Comparable[]{"num"}), Fields.size((int)6));
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName(splice.getName())).addSource("init", source).addTailSink((Pipe)splice, sink).addTailSink(sink2Pipe, sink2);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tb\t2\tb"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"3\tc\t3\tc\t3\tc"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"4\td\t4\td\t4\td"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"5\te\t5\te\t5\te"})));
    }

    @Test
    public void testForkThenJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "text"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper", (Pipe)pipeLower), new Fields(new Comparable[]{"text"}), (Function)new ExpressionFunction(Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testForkCoGroupThenHashJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("sourceLower", sourceLower);
        sources.put("sourceUpper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "text"}), " ");
        Each leftPipeLower = new Each(new Pipe("sourceLower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each rightPipeUpper = new Each(new Pipe("sourceUpper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each leftPipeUpper = new Each(new Pipe("leftUpper", (Pipe)leftPipeLower), new Fields(new Comparable[]{"text"}), (Function)new ExpressionFunction(Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        Each rightPipeLower = new Each(new Pipe("rightLower", (Pipe)rightPipeUpper), new Fields(new Comparable[]{"text"}), (Function)new ExpressionFunction(Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        leftPipeUpper = new GroupBy((Pipe)leftPipeUpper, new Fields(new Comparable[]{"num"}));
        rightPipeLower = new GroupBy((Pipe)rightPipeLower, new Fields(new Comparable[]{"num"}));
        CoGroup middleSplice = new CoGroup("middleCoGroup", (Pipe)leftPipeUpper, new Fields(new Comparable[]{"num"}), (Pipe)rightPipeLower, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numM1", "charM1", "numM2", "charM2"}));
        HashJoin leftSplice = new HashJoin((Pipe)leftPipeLower, new Fields(new Comparable[]{"num"}), (Pipe)middleSplice, new Fields(new Comparable[]{"numM1"}));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)leftSplice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tb"})));
    }

    @Test
    public void testForkCoGroupThenHashJoinCoGroupAgain() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("sourceLower", sourceLower);
        sources.put("sourceUpper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "text"}), " ");
        Each leftPipeLower = new Each(new Pipe("sourceLower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each rightPipeUpper = new Each(new Pipe("sourceUpper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each leftPipeUpper = new Each(new Pipe("leftUpper", (Pipe)leftPipeLower), new Fields(new Comparable[]{"text"}), (Function)new ExpressionFunction(Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        Each rightPipeLower = new Each(new Pipe("rightLower", (Pipe)rightPipeUpper), new Fields(new Comparable[]{"text"}), (Function)new ExpressionFunction(Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        leftPipeUpper = new GroupBy((Pipe)leftPipeUpper, new Fields(new Comparable[]{"num"}));
        rightPipeLower = new GroupBy((Pipe)rightPipeLower, new Fields(new Comparable[]{"num"}));
        CoGroup middleSplice = new CoGroup("middleCoGroup", (Pipe)leftPipeUpper, new Fields(new Comparable[]{"num"}), (Pipe)rightPipeLower, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numM1", "charM1", "numM2", "charM2"}));
        HashJoin leftSplice = new HashJoin((Pipe)leftPipeLower, new Fields(new Comparable[]{"num"}), (Pipe)middleSplice, new Fields(new Comparable[]{"numM1"}));
        HashJoin rightSplice = new HashJoin((Pipe)rightPipeUpper, new Fields(new Comparable[]{"num"}), (Pipe)middleSplice, new Fields(new Comparable[]{"numM2"}));
        leftSplice = new Rename((Pipe)leftSplice, new Fields(new Comparable[]{"num", "text", "numM1", "charM1", "numM2", "charM2"}), new Fields(new Comparable[]{"numL1", "charL1", "numM1L", "charM1L", "numM2L", "charM2L"}));
        rightSplice = new Rename((Pipe)rightSplice, new Fields(new Comparable[]{"num", "text", "numM1", "charM1", "numM2", "charM2"}), new Fields(new Comparable[]{"numR1", "charR1", "numM1R", "charM1R", "numM2R", "charM2R"}));
        leftSplice = new GroupBy((Pipe)leftSplice, new Fields(new Comparable[]{"numM1L"}));
        rightSplice = new GroupBy((Pipe)rightSplice, new Fields(new Comparable[]{"numM2R"}));
        CoGroup splice = new CoGroup("cogrouping", (Pipe)leftSplice, new Fields(new Comparable[]{"numM1L"}), (Pipe)rightSplice, new Fields(new Comparable[]{"numM2R"}));
        Map<Object, Object> properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        JoinFieldedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List values = JoinFieldedPipesPlatformTest.getSinkAsList((Flow)flow);
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\t1\tA\t1\tA\t1\ta"})));
        JoinFieldedPipesPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tb\t2\tB\t2\tB\t2\tb"})));
    }

    public static class AllComparator
    implements Comparator<Comparable>,
    Hasher<Comparable>,
    Serializable {
        @Override
        public int compare(Comparable lhs, Comparable rhs) {
            return lhs.toString().compareTo(rhs.toString());
        }

        public int hashCode(Comparable value) {
            if (value == null) {
                return 0;
            }
            return value.toString().hashCode();
        }
    }
}

