/*
 * Decompiled with CFR 0.152.
 */
package cascading;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.operation.Aggregator;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.First;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import data.InputData;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.junit.Test;

public class MergePipesPlatformTest
extends PlatformTestCase {
    public MergePipesPlatformTest() {
        super(true);
    }

    @Test
    public void testSimpleMerge() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("simplemerge"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
        List results = MergePipesPlatformTest.getSinkAsList((Flow)flow);
        MergePipesPlatformTest.assertTrue((String)"missing value", (boolean)results.contains(new Tuple(new Object[]{"1\ta"})));
        MergePipesPlatformTest.assertTrue((String)"missing value", (boolean)results.contains(new Tuple(new Object[]{"1\tA"})));
    }

    @Test
    public void testMergeGroupBy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        HashMap sources = new HashMap();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath(), SinkMode.REPLACE);
        Pipe pipeLower = new Pipe("lower");
        Pipe pipeUpper = new Pipe("upper");
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        splice = new GroupBy((Pipe)splice, Fields.ALL);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
        List results = MergePipesPlatformTest.getSinkAsList((Flow)flow);
        MergePipesPlatformTest.assertTrue((String)"missing value", (boolean)results.contains(new Tuple(new Object[]{"1\ta"})));
        MergePipesPlatformTest.assertTrue((String)"missing value", (boolean)results.contains(new Tuple(new Object[]{"1\tA"})));
    }

    @Test
    public void testSimpleMergeThree() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simplemergethree"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper, pipeOffset});
        splice = new Each((Pipe)splice, new Fields(new Comparable[]{"num", "char"}), (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)14);
    }

    @Test
    public void testSimpleMergeThreeChain() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simplemergethreechain"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        splice = new Merge(new Pipe[]{splice, pipeOffset});
        splice = new Each((Pipe)splice, new Fields(new Comparable[]{"num", "char"}), (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)14);
    }

    @Test
    public void testSimpleMergeThreeChainGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simplemergethreechaingroup"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        splice = new Merge(new Pipe[]{splice, pipeOffset});
        splice = new GroupBy((Pipe)splice, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)14);
    }

    @Test
    public void testSimpleMergeThreeChainCoGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simplemergethreechaincogroup"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        splice = new CoGroup((Pipe)splice, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)6);
    }

    @Test
    public void testSameSourceMergeThreeChainGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("split", sourceLower);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("samemergethreechaingroup"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipe = new Pipe("split");
        Each pipeLower = new Each(new Pipe("lower", pipe), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper", pipe), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeOffset = new Each(new Pipe("offset", pipe), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        splice = new GroupBy((Pipe)splice, new Fields(new Comparable[]{"num"}));
        splice = new Merge(new Pipe[]{splice, pipeOffset});
        splice = new GroupBy((Pipe)splice, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)2, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)15);
    }

    @Test
    public void testSplitSameSourceMerged() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("splitsourcemerged"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*102.*"));
        Merge merged = new Merge("merged", new Pipe[]{left, right});
        merged = new Each((Pipe)merged, new Fields(new Comparable[]{"line"}), (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)merged);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)3);
    }

    @Test
    public void testSplitSameSourceMergedComplex() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("splitsourcemergedcomplex"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*102.*"));
        Merge merged = new Merge("merged-first", new Pipe[]{left, right});
        merged = new Each((Pipe)merged, new Fields(new Comparable[]{"line"}), (Function)new Identity());
        left = new Each(new Pipe("left", (Pipe)merged), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        right = new Each(new Pipe("right", (Pipe)merged), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*102.*"));
        merged = new Merge("merged-second", new Pipe[]{left, right});
        merged = new Each((Pipe)merged, new Fields(new Comparable[]{"line"}), (Function)new Identity());
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)merged);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)3);
    }

    @Test
    public void testSimpleMergeFail() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("simplemergefail"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Rename((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num2"}));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
            MergePipesPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testMergeIntoHashJoinStreamed() throws Exception {
        this.runMergeIntoHashJoin(true);
    }

    @Test
    public void testMergeIntoHashJoinAccumulated() throws Exception {
        this.runMergeIntoHashJoin(false);
    }

    private void runMergeIntoHashJoin(boolean streamed) throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        String name = streamed ? "streamed" : "accumulated";
        String path = "mergeintohashjoin" + name;
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(path), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        splice = streamed ? new HashJoin((Pipe)splice, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"})) : new HashJoin((Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}), (Pipe)splice, new Fields(new Comparable[]{"num1"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)1, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)6);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamed() throws Exception {
        this.runHashJoinIntoMergeIntoHashJoin(true);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulated() throws Exception {
        this.runHashJoinIntoMergeIntoHashJoin(false);
    }

    private void runHashJoinIntoMergeIntoHashJoin(boolean streamed) throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        String name = streamed ? "streamed" : "accumulated";
        String path = "hashjoinintomergeintohashjoin" + name;
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(path), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}));
        splice = new Retain((Pipe)splice, new Fields(new Comparable[]{"num1", "char1"}));
        splice = new Merge("merge", new Pipe[]{splice, pipeUpper});
        splice = streamed ? new HashJoin((Pipe)splice, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"})) : new HashJoin((Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}), (Pipe)splice, new Fields(new Comparable[]{"num1"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)(streamed ? 1 : 2), (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)8);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedStreamedMerge() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(true, true, true, 1);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedAccumulatedMerge() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(false, false, true, 3);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedAccumulatedMerge() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(true, false, true, 2);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedStreamedMerge() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(false, true, true, 3);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedStreamed() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(true, true, false, 1);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedAccumulated() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(false, false, false, 3);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedAccumulated() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(true, false, false, 2);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedStreamed() throws Exception {
        this.runMultiHashJoinIntoMergeIntoHashJoin(false, true, false, 3);
    }

    private void runMultiHashJoinIntoMergeIntoHashJoin(boolean firstStreamed, boolean secondStreamed, boolean interMerge, int expectedSteps) throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        Tap sourceLowerOffset = this.getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        sources.put("offset", sourceLowerOffset);
        String name = firstStreamed ? "firstStreamed" : "firstAccumulated";
        name = name + (secondStreamed ? "secondStreamed" : "secondAccumulated");
        name = name + (interMerge ? "interMerge" : "noInterMerge");
        String path = "multihashjoinintomergeintohashjoin" + name;
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(path), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each pipeOffset = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}));
        splice = new Retain((Pipe)splice, new Fields(new Comparable[]{"num1", "char1"}));
        splice = new Merge("merge1", new Pipe[]{splice, pipeUpper});
        splice = firstStreamed ? new HashJoin((Pipe)splice, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"})) : new HashJoin((Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}), (Pipe)splice, new Fields(new Comparable[]{"num1"}));
        splice = new Retain((Pipe)splice, new Fields(new Comparable[]{"num1", "char1"}));
        if (interMerge) {
            splice = new Merge("merge2", new Pipe[]{splice, pipeUpper});
        }
        splice = secondStreamed ? new HashJoin((Pipe)splice, new Fields(new Comparable[]{"num1"}), (Pipe)pipeOffset, new Fields(new Comparable[]{"num2"})) : new HashJoin((Pipe)pipeOffset, new Fields(new Comparable[]{"num2"}), (Pipe)splice, new Fields(new Comparable[]{"num1"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        if (this.getPlatform().isMapReduce()) {
            MergePipesPlatformTest.assertEquals((String)"wrong num jobs", (int)expectedSteps, (int)flow.getFlowSteps().size());
        }
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)(interMerge ? 17 : 14));
    }

    @Test
    public void testGroupByAggregationMerge() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath(), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        pipeLower = new Every((Pipe)pipeLower, new Fields(new Comparable[]{"char"}), (Aggregator)new First(Fields.ARGS));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        pipeUpper = new Every((Pipe)pipeUpper, new Fields(new Comparable[]{"char"}), (Aggregator)new First(Fields.ARGS));
        Merge splice = new Merge("merge", new Pipe[]{pipeLower, pipeUpper});
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
        List results = MergePipesPlatformTest.getSinkAsList((Flow)flow);
        MergePipesPlatformTest.assertTrue((String)"missing value", (boolean)results.contains(new Tuple(new Object[]{"1\ta"})));
        MergePipesPlatformTest.assertTrue((String)"missing value", (boolean)results.contains(new Tuple(new Object[]{"1\tA"})));
    }

    @Test
    public void testSameSourceMerge() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe lhs = new Pipe("lhs");
        lhs = new Pipe("lhs", lhs);
        Pipe rhs = new Pipe("rhs");
        Merge merge = new Merge("merge", new Pipe[]{lhs, rhs});
        FlowDef flowDef = FlowDef.flowDef().addSource(lhs, source).addSource(rhs, source).addTailSink((Pipe)merge, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testSameSourceMergeHashJoin() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe mergeLhs = new Pipe("lhs");
        Pipe mergeRhs = new Pipe("rhs");
        Merge mergePipe = new Merge("merge", new Pipe[]{mergeLhs, mergeRhs});
        mergePipe = new Rename((Pipe)mergePipe, new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"merged.num", "merged.char"}));
        Pipe joinRhs = new Pipe("join");
        joinRhs = new Rename(joinRhs, new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"rhs.num", "rhs.char"}));
        HashJoin lookupJoin = new HashJoin((Pipe)mergePipe, new Fields(new Comparable[]{"merged.num"}), joinRhs, new Fields(new Comparable[]{"rhs.num"}));
        Retain retain = new Retain((Pipe)lookupJoin, new Fields(new Comparable[]{"merged.num", "merged.char", "rhs.char"}));
        Rename out = new Rename((Pipe)retain, new Fields(new Comparable[]{"merged.num", "merged.char", "rhs.char"}), new Fields(new Comparable[]{"num", "merged", "char"}));
        FlowDef flowDef = FlowDef.flowDef().addSource(mergeLhs, source).addSource(mergeRhs, source).addSource(joinRhs, source).addTailSink((Pipe)out, sink);
        FlowConnector flowConnector = this.getPlatform().getFlowConnector();
        Flow flow = flowConnector.connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testHashJoinMerge() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap lower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap upper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap offset = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLowerOffset);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe lhs = new Pipe("lhs");
        Pipe mid = new Pipe("mid");
        Pipe rhs = new Pipe("rhs");
        mid = new Rename(mid, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        HashJoin join = new HashJoin(lhs, new Fields(new Comparable[]{"num"}), mid, new Fields(new Comparable[]{"num2"}));
        join = new Retain((Pipe)join, new Fields(new Comparable[]{"num", "char"}));
        Merge merge = new Merge("merge", new Pipe[]{join, rhs});
        FlowDef flowDef = FlowDef.flowDef().addSource(lhs, lower).addSource(mid, upper).addSource(rhs, offset).addTailSink((Pipe)merge, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)9);
    }

    @Test
    public void testSameSourceHashJoinMergeOnStreamed() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap lower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap upper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe lhs = new Pipe("lhs");
        Pipe accumulated = new Pipe("accumulated");
        Pipe rhs = new Pipe("rhs");
        HashJoin join = new HashJoin(lhs, new Fields(new Comparable[]{"num"}), accumulated, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num", "char", "num2", "char2"}));
        join = new Retain((Pipe)join, new Fields(new Comparable[]{"num", "char"}));
        Merge merge = new Merge("merge", new Pipe[]{join, rhs});
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName("streamed")).addSource(lhs, lower).addSource(accumulated, upper).addSource(rhs, lower).addTailSink((Pipe)merge, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testSameSourceHashJoinMergeOnAccumulated() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap lower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap upper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe lhs = new Pipe("lhs");
        Pipe accumulated = new Pipe("accumulated");
        Pipe rhs = new Pipe("rhs");
        HashJoin join = new HashJoin(lhs, new Fields(new Comparable[]{"num"}), accumulated, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num", "char", "num2", "char2"}));
        join = new Retain((Pipe)join, new Fields(new Comparable[]{"num", "char"}));
        Merge merge = new Merge("merge", new Pipe[]{join, rhs});
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName("accumulated")).addSource(lhs, upper).addSource(accumulated, lower).addSource(rhs, lower).addTailSink((Pipe)merge, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testHashJoinHashJoinMerge() throws Exception {
        if (this.getPlatform().isDAG()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap lhsLower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap lhsUpper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap rhsLower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap rhsUpper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe lhsLowerPipe = new Pipe("lhsLower");
        Pipe lhsUpperPipe = new Pipe("lhsUpper");
        Pipe rhsLowerPipe = new Pipe("rhsLower");
        Pipe rhsUpperPipe = new Pipe("rhsUpper");
        lhsUpperPipe = new Rename(lhsUpperPipe, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        HashJoin lhs = new HashJoin(lhsLowerPipe, new Fields(new Comparable[]{"num"}), lhsUpperPipe, new Fields(new Comparable[]{"num2"}));
        lhs = new Retain((Pipe)lhs, new Fields(new Comparable[]{"num", "char"}));
        rhsUpperPipe = new Rename(rhsUpperPipe, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        HashJoin rhs = new HashJoin(rhsLowerPipe, new Fields(new Comparable[]{"num"}), rhsUpperPipe, new Fields(new Comparable[]{"num2"}));
        rhs = new Retain((Pipe)rhs, new Fields(new Comparable[]{"num", "char"}));
        Merge merge = new Merge("merge", new Pipe[]{lhs, rhs});
        FlowDef flowDef = FlowDef.flowDef().addSource(lhsLowerPipe, lhsLower).addSource(lhsUpperPipe, lhsUpper).addSource(rhsLowerPipe, rhsLower).addSource(rhsUpperPipe, rhsUpper).addTailSink((Pipe)merge, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testHashJoinHashJoinHashJoinMergeMerge() throws Exception {
        if (this.getPlatform().isDAG()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap lhsLower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap lhsUpper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap midLower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap midUpper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap rhsLower = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap rhsUpper = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap far = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLowerOffset);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Pipe lhsLowerPipe = new Pipe("lhsLower");
        Pipe lhsUpperPipe = new Pipe("lhsUpper");
        Pipe midLowerPipe = new Pipe("midLower");
        Pipe midUpperPipe = new Pipe("midUpper");
        Pipe rhsLowerPipe = new Pipe("rhsLower");
        Pipe rhsUpperPipe = new Pipe("rhsUpper");
        Pipe farPipe = new Pipe("far");
        lhsUpperPipe = new Rename(lhsUpperPipe, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        HashJoin lhs = new HashJoin(lhsLowerPipe, new Fields(new Comparable[]{"num"}), lhsUpperPipe, new Fields(new Comparable[]{"num2"}));
        lhs = new Retain((Pipe)lhs, new Fields(new Comparable[]{"num", "char"}));
        midUpperPipe = new Rename(midUpperPipe, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        HashJoin mid = new HashJoin(midLowerPipe, new Fields(new Comparable[]{"num"}), midUpperPipe, new Fields(new Comparable[]{"num2"}));
        mid = new Retain((Pipe)mid, new Fields(new Comparable[]{"num", "char"}));
        rhsUpperPipe = new Rename(rhsUpperPipe, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        HashJoin rhs = new HashJoin(rhsLowerPipe, new Fields(new Comparable[]{"num"}), rhsUpperPipe, new Fields(new Comparable[]{"num2"}));
        rhs = new Retain((Pipe)rhs, new Fields(new Comparable[]{"num", "char"}));
        Merge merge = new Merge("merge", new Pipe[]{lhs, mid, rhs});
        merge = new Merge("next merge", new Pipe[]{merge, farPipe});
        FlowDef flowDef = FlowDef.flowDef().addSource(lhsLowerPipe, lhsLower).addSource(lhsUpperPipe, lhsUpper).addSource(midLowerPipe, midLower).addSource(midUpperPipe, midUpper).addSource(rhsLowerPipe, rhsLower).addSource(rhsUpperPipe, rhsUpper).addSource(farPipe, far).addTailSink((Pipe)merge, sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        MergePipesPlatformTest.validateLength((Flow)flow, (int)19);
    }
}

