package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.TestBuffer;
import cascading.TestFunction;
import cascading.flow.FlowConnector;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.FlowStep;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.PlannerException;
import cascading.flow.planner.graph.ElementGraphs;
import cascading.operation.AssertionLevel;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.aggregator.MaxValue;
import cascading.operation.aggregator.Sum;
import cascading.operation.assertion.AssertNotNull;
import cascading.operation.assertion.AssertNull;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.InnerJoin;
import cascading.scheme.hadoop.SequenceFile;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.util.TempHfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:cascading/flow/hadoop/BuildJobsHadoopPlatformTest.class */
public class BuildJobsHadoopPlatformTest extends PlatformTestCase {
    public BuildJobsHadoopPlatformTest() {
        super(false);
    }

    @Test
    public void testIdentity() throws Exception {
        List flowSteps = getPlatform().getFlowConnector().connect(new Hfs(new TextLine(), "input/path"), new Hfs(new TextLine(), "output/path", SinkMode.REPLACE), new Pipe("test")).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        HadoopFlowStep hadoopFlowStep = (HadoopFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 1, hadoopFlowStep.getSourceTaps().size());
        assertNull("not null: step.groupBy", hadoopFlowStep.getGroup());
        assertNotNull("null: step.sink", hadoopFlowStep.getSink());
    }

    @Test
    public void testName() {
        Pipe pipe = new Pipe("count");
        Every every = new Every(new GroupBy(pipe, new Fields(new Comparable[]{1})), new Fields(new Comparable[]{1}), new Count(), new Fields(new Comparable[]{0, 1}));
        assertEquals("not equal: count.getName()", "count", pipe.getName());
        assertEquals("not equal: pipe.getName()", "count", every.getName());
        assertEquals("not equal: pipe.getName()", "count", new Each(pipe, new Fields(new Comparable[]{1}), new RegexSplitter(Fields.size(2))).getName());
    }

    @Test
    public void testOneJob() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        hashMap2.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Every(new GroupBy(new Pipe("count"), new Fields(new Comparable[]{1})), new Fields(new Comparable[]{1}), new Count(), new Fields(new Comparable[]{0, 1}))}).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 1, baseFlowStep.getSourceTaps().size());
        assertNotNull("null: step.groupBy", baseFlowStep.getGroup());
        assertNotNull("null: step.sink", baseFlowStep.getSink());
        assertEquals("not equal: mapDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) baseFlowStep.getSourceTaps().iterator().next(), baseFlowStep.getGroup()));
        assertEquals("not equal: reduceDist", 2, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), baseFlowStep.getGroup(), baseFlowStep.getSink()));
    }

    @Test
    public void testOneJob2() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        hashMap2.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Every(new GroupBy(new Each(new Each(new Pipe("count"), new Fields(new Comparable[]{1}), new Identity(), new Fields(new Comparable[]{2})), new Fields(new Comparable[]{0}), new Identity(new Fields(new Comparable[]{"_all"})), new Fields(new Comparable[]{1})), new Fields(new Comparable[]{0})), new Fields(new Comparable[]{0}), new Count(), new Fields(new Comparable[]{0, 1}))}).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 1, baseFlowStep.getSourceTaps().size());
        assertNotNull("null: step.groupBy", baseFlowStep.getGroup());
        assertNotNull("null: step.sink", baseFlowStep.getSink());
        assertEquals("not equal: mapDist", 3, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) baseFlowStep.getSourceTaps().iterator().next(), baseFlowStep.getGroup()));
        assertEquals("not equal: reduceDist", 2, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), baseFlowStep.getGroup(), baseFlowStep.getSink()));
    }

    @Test
    public void testOneJob3() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("a", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        hashMap.put("b", new Hfs(new TextLine(new Fields(new Comparable[]{"third", "fourth"})), "input/path/b"));
        Pipe coGroup = new CoGroup(new Pipe("a"), new Fields(new Comparable[]{1}), new Pipe("b"), new Fields(new Comparable[]{1}));
        hashMap2.put(coGroup.getName(), new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{coGroup}).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 2, baseFlowStep.getSourceTaps().size());
        assertNotNull("null: step.groupBy", baseFlowStep.getGroup());
        assertNotNull("null: step.sink", baseFlowStep.getSink());
        Iterator it = baseFlowStep.getSourceTaps().iterator();
        assertEquals("not equal: mapDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) it.next(), baseFlowStep.getGroup()));
        assertEquals("not equal: mapDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) it.next(), baseFlowStep.getGroup()));
        assertEquals("not equal: reduceDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), baseFlowStep.getGroup(), baseFlowStep.getSink()));
    }

    @Test
    public void testOneJob4() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("a", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        hashMap.put("b", new Hfs(new TextLine(new Fields(new Comparable[]{"third", "fourth"})), "input/path/b"));
        Pipe each = new Each(new CoGroup(new Pipe("a"), new Fields(new Comparable[]{1}), new Pipe("b"), new Fields(new Comparable[]{1})), new Identity());
        hashMap2.put(each.getName(), new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each}).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 2, baseFlowStep.getSourceTaps().size());
        assertNotNull("null: step.groupBy", baseFlowStep.getGroup());
        assertNotNull("null: step.sink", baseFlowStep.getSink());
        assertEquals("not equal: mapDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) baseFlowStep.getSourceTaps().iterator().next(), baseFlowStep.getGroup()));
        assertEquals("not equal: reduceDist", 2, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), baseFlowStep.getGroup(), baseFlowStep.getSink()));
    }

    @Test
    public void testOneJob5() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("a", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        hashMap.put("b", new Hfs(new TextLine(new Fields(new Comparable[]{"third", "fourth"})), "input/path/b"));
        Pipe each = new Each(new CoGroup(new Pipe[]{new Pipe("a"), new Pipe("b")}), new Identity());
        hashMap2.put(each.getName(), new Hfs(new TextLine(), "output/path"));
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each}).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 2, baseFlowStep.getSourceTaps().size());
        assertNotNull("null: step.groupBy", baseFlowStep.getGroup());
        assertNotNull("null: step.sink", baseFlowStep.getSink());
        assertEquals("not equal: mapDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) baseFlowStep.getSourceTaps().iterator().next(), baseFlowStep.getGroup()));
        assertEquals("not equal: reduceDist", 2, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), baseFlowStep.getGroup(), baseFlowStep.getSink()));
    }

    @Test
    public void testNoGroup() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        hashMap2.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Every(new Each(new Pipe("count"), new Identity()), new Fields(new Comparable[]{1}), new Count(), new Fields(new Comparable[]{0, 1}))});
            fail("did not throw flow exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testSplit() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        assertEquals("not equal: steps.size()", 2, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each2, each3}).getFlowSteps().size());
    }

    @Test
    public void testSplitHangingTails() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each});
            fail("did not catch missing tails");
        } catch (Exception e) {
            System.out.println("exception.getMessage() = " + e.getMessage());
            assertTrue(e.getMessage().contains("'left', 'right'"));
        }
    }

    @Test
    public void testSplitOnNonSafeOperations() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Each each = new Each(new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"})), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each2, each3}).getFlowSteps();
        assertEquals("not equal: steps.size()", 3, flowSteps.size());
        assertEquals("wrong number of operations", 2, ((FlowStep) flowSteps.get(0)).getAllOperations().size());
    }

    @Test
    public void testSplitOnNonSafeOperationsSimple() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"}));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each2, each3}).getFlowSteps();
        assertEquals("not equal: steps.size()", 3, flowSteps.size());
        assertEquals("wrong number of operations", 1, ((FlowStep) flowSteps.get(0)).getAllOperations().size());
    }

    @Test
    public void testSplitOnNonSafeOperations2() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Hfs hfs4 = new Hfs(new TextLine(), "foo/split3", SinkMode.REPLACE);
        Pipe pipe = new Pipe("middle", new Each(new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"})), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*")));
        Pipe each = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        Pipe each2 = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        hashMap2.put("middle", hfs4);
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each, each2}).getFlowSteps();
        assertEquals("not equal: steps.size()", 4, flowSteps.size());
        assertEquals("wrong number of operations", 2, ((FlowStep) flowSteps.get(0)).getAllOperations().size());
    }

    @Test
    public void testSplitComplex() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Each each = new Each(new Every(new GroupBy(new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}), new Count(), new Fields(new Comparable[]{"ip", "count"})), new Fields(new Comparable[]{"ip"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"ip"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"ip"}), new RegexFilter(".*192.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each2, each3}).getFlowSteps();
        assertEquals("not equal: steps.size()", 3, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        FlowElement nextFlowElement = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(baseFlowStep.getGroup()));
        assertTrue("not an Every", nextFlowElement instanceof Every);
        FlowElement nextFlowElement2 = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(nextFlowElement));
        assertTrue("not a Each", nextFlowElement2 instanceof Each);
        assertTrue("not a TempHfs", baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(nextFlowElement2)) instanceof TempHfs);
    }

    @Test
    public void testSplitComplex2() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "foo/split2", SinkMode.REPLACE);
        Each each = new Each(new Every(new GroupBy(new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}), new Count(), new Fields(new Comparable[]{"ip", "count"})), new Fields(new Comparable[]{"ip"}), new RegexFilter("^68.*"));
        Pipe pipe = new Pipe("left", new Each(each, new Fields(new Comparable[]{"ip"}), new RegexFilter(".*46.*")));
        Pipe pipe2 = new Pipe("right", new Each(each, new Fields(new Comparable[]{"ip"}), new RegexFilter(".*192.*")));
        HashMap hashMap = new HashMap();
        hashMap.put("split", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs2);
        hashMap2.put("right", hfs3);
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{pipe, pipe2}).getFlowSteps();
        assertEquals("not equal: steps.size()", 3, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        FlowElement nextFlowElement = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(baseFlowStep.getGroup()));
        assertTrue("not an Every", nextFlowElement instanceof Every);
        FlowElement nextFlowElement2 = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(nextFlowElement));
        assertTrue("not a Each", nextFlowElement2 instanceof Each);
        assertTrue("not a TempHfs", baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(nextFlowElement2)) instanceof TempHfs);
    }

    @Test
    public void testMerge() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge2");
        Hfs hfs3 = new Hfs(new TextLine(), "foo");
        Pipe groupBy = new GroupBy("merge", Pipe.pipes(new Pipe[]{new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"))}), new Fields(new Comparable[]{"offset"}));
        HashMap hashMap = new HashMap();
        hashMap.put("left", hfs);
        hashMap.put("right", hfs2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("merge", hfs3);
        assertEquals("not equal: steps.size()", 1, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy}).getFlowSteps().size());
    }

    @Test
    public void testDupeSource() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs hfs3 = new Hfs(new TextLine(), "foo");
        Pipe groupBy = new GroupBy("merge", Pipe.pipes(new Pipe[]{new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Each(new Each(new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*")), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*")), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*")), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"))}), new Fields(new Comparable[]{"offset"}));
        HashMap hashMap = new HashMap();
        hashMap.put("left", hfs);
        hashMap.put("right", hfs2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("merge", hfs3);
        assertEquals("not equal: steps.size()", 1, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy}).getFlowSteps().size());
    }

    @Test
    public void testDupeSourceRepeat() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs hfs2 = new Hfs(new TextLine(), "foo");
        Pipe coGroup = new CoGroup("cogroup", new Pipe("pipe"), new Fields(new Comparable[]{"offset"}), 1, Fields.size(4));
        HashMap hashMap = new HashMap();
        hashMap.put("pipe", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("cogroup", hfs2);
        assertEquals("not equal: steps.size()", 1, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{coGroup}).getFlowSteps().size());
    }

    @Test
    public void testDupeSource2() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs hfs2 = new Hfs(new TextLine(), "foo");
        Pipe coGroup = new CoGroup("cogroup", new Pipe("left"), new Fields(new Comparable[]{"offset"}), new Pipe("right"), new Fields(new Comparable[]{"offset"}), Fields.size(4));
        HashMap hashMap = new HashMap();
        hashMap.put("left", hfs);
        hashMap.put("right", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("cogroup", hfs2);
        getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{coGroup});
    }

    @Test
    public void testDupeSource3() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar/merge");
        Hfs hfs3 = new Hfs(new TextLine(), "foo");
        Pipe coGroup = new CoGroup("cogroup", Pipe.pipes(new Pipe[]{new Pipe("left"), new Pipe("middle"), new Pipe("right")}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"offset"}), new Fields(new Comparable[]{"offset"}), new Fields(new Comparable[]{"offset"})}), Fields.size(6));
        HashMap hashMap = new HashMap();
        hashMap.put("left", hfs);
        hashMap.put("middle", hfs2);
        hashMap.put("right", hfs);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("cogroup", hfs3);
        getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{coGroup});
    }

    @Test
    public void testMerge2() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs hfs2 = new Hfs(new SequenceFile(new Fields(new Comparable[]{"offset", "line"})), "foo/merge2");
        Hfs hfs3 = new Hfs(new TextLine(), "foo");
        Pipe groupBy = new GroupBy("merge", Pipe.pipes(new Pipe[]{new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"))}), new Fields(new Comparable[]{"offset"}));
        HashMap hashMap = new HashMap();
        hashMap.put("left", hfs);
        hashMap.put("right", hfs2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("merge", hfs3);
        assertEquals("not equal: steps.size()", 1, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy}).getFlowSteps().size());
    }

    @Test
    public void testMergeSameSourceSplit() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs hfs2 = new Hfs(new TextLine(), "foo");
        Each each = new Each(new Pipe("source"), new Fields(new Comparable[]{"line"}), new ExpressionFilter("line.length() != 0", String.class));
        assertEquals("not equal: steps.size()", 1, getPlatform().getFlowConnector().connect(hfs, hfs2, new GroupBy("merge", Pipe.pipes(new Pipe[]{new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"))}), new Fields(new Comparable[]{"offset"}))).getFlowSteps().size());
    }

    @Test
    public void testCoGroupAroundCoGroup() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"num"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("source20", hfs2);
        hashMap.put("source101", hfs);
        hashMap.put("source102", hfs);
        Hfs hfs3 = new Hfs(new TextLine(), "baz", SinkMode.REPLACE);
        Pipe pipe = new Pipe("source20");
        Pipe pipe2 = new Pipe("source101");
        assertEquals("not equal: steps.size()", 2, getPlatform().getFlowConnector().connect(hashMap, hfs3, new CoGroup(new CoGroup(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"})), new Fields(new Comparable[]{"num1"}), new Pipe("source102"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"}))).getFlowSteps().size());
    }

    @Test
    public void testCoGroupAroundCoGroupOptimized() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"num"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("source20", hfs2);
        hashMap.put("source101", hfs);
        hashMap.put("source102", hfs);
        Hfs hfs3 = new Hfs(new TextLine(), "baz", SinkMode.REPLACE);
        Pipe pipe = new Pipe("source20");
        Pipe pipe2 = new Pipe("source101");
        CoGroup coGroup = new CoGroup(new CoGroup(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"})), new Fields(new Comparable[]{"num1"}), new Pipe("source102"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"}));
        Properties properties = new Properties();
        FlowConnectorProps.setIntermediateSchemeClass(properties, TextLine.class);
        assertEquals("not equal: steps.size()", 2, getPlatform().getFlowConnector(properties).connect(hashMap, hfs3, coGroup).getFlowSteps().size());
    }

    @Test
    public void testCoGroupAroundCoGroupAroundCoGroup() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper1", hfs2);
        hashMap.put("upper2", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs hfs3 = new Hfs(new TextLine(), "output", SinkMode.REPLACE);
        Each each = new Each("lower", new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each("upper1", new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each("upper2", new Fields(new Comparable[]{"line"}), regexSplitter);
        GroupBy groupBy = new GroupBy(new Each(new CoGroup(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Fields(new Comparable[]{0}));
        try {
            assertEquals("not equal: steps.size()", 5, getPlatform().getFlowConnector().connect(hashMap, hfs3, new CoGroup(new GroupBy(new Each(new CoGroup(groupBy, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})), new Identity()), new Fields(new Comparable[]{0})), new Fields(new Comparable[]{"num1"}), groupBy, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5"}))).getFlowSteps().size());
        } catch (FlowException e) {
            throw e;
        }
    }

    @Test
    public void testCoGroupWithResultGroupFieldsDefault() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        getPlatform().getFlowConnector().connect(hashMap, new Hfs(new TextLine(), "/complex/cogroup/", SinkMode.REPLACE), new Every(new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "lhs", "num2", "rhs"})), new First(new Fields(new Comparable[]{"value"})), new Fields(new Comparable[]{"num1", "value"})));
    }

    @Test
    public void testCoGroupWithResultGroupFields() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        getPlatform().getFlowConnector().connect(hashMap, new Hfs(new TextLine(), "/complex/cogroup/", SinkMode.REPLACE), new Every(new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "lhs", "num2", "rhs"}), new Fields(new Comparable[]{"somenum", "somenum2"})), new First(new Fields(new Comparable[]{"value"})), new Fields(new Comparable[]{"somenum", "value"})));
    }

    @Test
    public void testDirectCoGroup() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", hfs);
        hashMap.put("lower2", hfs);
        hashMap.put("upper1", hfs2);
        hashMap.put("upper2", hfs2);
        Hfs hfs3 = new Hfs(new TextLine(), "output1", SinkMode.REPLACE);
        Hfs hfs4 = new Hfs(new TextLine(), "output2", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", hfs3);
        hashMap2.put("output2", hfs4);
        Pipe pipe = new Pipe("lower1");
        Pipe pipe2 = new Pipe("lower2");
        Pipe pipe3 = new Pipe("upper1");
        Pipe pipe4 = new Pipe("upper2");
        CoGroup coGroup = new CoGroup(pipe, new Fields(new Comparable[]{"num"}), pipe3, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        try {
            assertEquals("not equal: steps.size()", 5, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new CoGroup("output2", pipe2, new Fields(new Comparable[]{"num"}), new CoGroup("output1", new CoGroup(coGroup, new Fields(new Comparable[]{"num1"}), pipe4, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})), new Fields(new Comparable[]{"num1"}), coGroup, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5"})), new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5", "num6", "char6"}))}).getFlowSteps().size());
        } catch (FlowException e) {
            throw e;
        }
    }

    @Test
    public void testMultipleCoGroupSimilarSources() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", hfs);
        hashMap.put("upper1", hfs2);
        Hfs hfs3 = new Hfs(new TextLine(), "output1", SinkMode.REPLACE);
        Hfs hfs4 = new Hfs(new TextLine(), "output2", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", hfs3);
        hashMap2.put("output2", hfs4);
        Pipe pipe = new Pipe("lower1");
        Pipe pipe2 = new Pipe("upper1");
        CoGroup coGroup = new CoGroup(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        try {
            assertEquals("not equal: steps.size()", 5, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new CoGroup("output2", pipe2, new Fields(new Comparable[]{"num"}), new CoGroup("output1", new CoGroup(coGroup, new Fields(new Comparable[]{"num1"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})), new Fields(new Comparable[]{"num1"}), coGroup, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5"})), new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5", "num6", "char6"}))}).getFlowSteps().size());
        } catch (FlowException e) {
            throw e;
        }
    }

    @Test
    public void testMultipleCoGroupSplitSources() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", hfs);
        hashMap.put("upper1", hfs2);
        Hfs hfs3 = new Hfs(new TextLine(), "output1", SinkMode.REPLACE);
        Hfs hfs4 = new Hfs(new TextLine(), "output2", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", hfs3);
        hashMap2.put("output2", hfs4);
        Pipe pipe = new Pipe("lower1");
        Pipe pipe2 = new Pipe("upper1");
        CoGroup coGroup = new CoGroup("group", Pipe.pipes(new Pipe[]{new Pipe("lower2", new Each(new Each(new Each(pipe, new Identity()), new Identity()), new Identity())), new Every(new GroupBy(new Each(pipe, new Identity()), new Fields(new Comparable[]{"num", "char"})), new Fields(new Comparable[]{"num", "char"}), new Count(), new Fields(new Comparable[]{"num", "char"})), new Each(new Each(new Each(pipe2, new Identity()), new Identity()), new Identity())}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"})}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}), new InnerJoin());
        try {
            assertEquals("not equal: steps.size()", 4, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Pipe("output1", new Each(new Each(coGroup, AssertionLevel.VALID, new AssertNotNull()), new Identity())), new Pipe("output2", new Each(new Each(coGroup, AssertionLevel.VALID, new AssertNull()), new Identity()))}).getFlowSteps().size());
        } catch (FlowException e) {
            throw e;
        }
    }

    @Test
    public void testSplitEachOnGroup() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", hfs);
        Hfs hfs2 = new Hfs(new TextLine(), "output1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "output2", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", hfs2);
        hashMap2.put("output2", hfs3);
        GroupBy groupBy = new GroupBy(new Pipe("lower1"), new Fields(new Comparable[]{0}));
        try {
            assertEquals("not equal: steps.size()", 3, getPlatform().getFlowConnector().connect(hashMap, hashMap2, Pipe.pipes(new Pipe[]{new Each(new Pipe("output1", groupBy), new Identity()), new Each(new Pipe("output2", groupBy), new Identity())})).getFlowSteps().size());
        } catch (PlannerException e) {
            throw e;
        }
    }

    @Test
    public void testSplitEveryOnGroup() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", hfs);
        Hfs hfs2 = new Hfs(new TextLine(), "output1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "output2", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", hfs2);
        hashMap2.put("output2", hfs3);
        GroupBy groupBy = new GroupBy(new Pipe("lower1"), new Fields(new Comparable[]{0}));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, Pipe.pipes(new Pipe[]{new Every(new Pipe("output1", groupBy), new TestBuffer(new Fields(new Comparable[]{"left"}), true)), new Every(new Pipe("output2", groupBy), new TestBuffer(new Fields(new Comparable[]{"right"}), true))}));
            fail("did not throw planner exception");
        } catch (PlannerException e) {
        }
    }

    @Test
    public void testSplitOutput() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", hfs);
        Hfs hfs2 = new Hfs(new TextLine(), "output1", SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), "output2", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", hfs2);
        hashMap2.put("output2", hfs3);
        Pipe groupBy = new GroupBy("output1", new Pipe("lower1"), new Fields(new Comparable[]{0}));
        try {
            assertEquals("not equal: steps.size()", 3, getPlatform().getFlowConnector().connect(hashMap, hashMap2, Pipe.pipes(new Pipe[]{groupBy, new GroupBy("output2", groupBy, new Fields(new Comparable[]{0}))})).getFlowSteps().size());
        } catch (FlowException e) {
            throw e;
        }
    }

    @Test
    public void testSameSourceForBranch() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("a", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        Pipe pipe = new Pipe("a");
        Pipe groupBy = new GroupBy("tail", Pipe.pipes(new Pipe[]{new GroupBy("a1", pipe, Fields.FIRST), new GroupBy("a2", pipe, Fields.FIRST)}), new Fields(new Comparable[]{"first", "second"}));
        hashMap2.put(groupBy.getName(), new Hfs(new TextLine(), "output/path"));
        assertEquals("not equal: steps.size()", 3, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy}).getFlowSteps().size());
    }

    @Test
    public void testSameTaps() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        hashMap.put("a", hfs);
        hashMap.put("b", hfs);
        Pipe groupBy = new GroupBy("tail", Pipe.pipes(new Pipe[]{new GroupBy(new Pipe("a")), new GroupBy(new Pipe("b"))}), new Fields(new Comparable[]{"first", "second"}));
        hashMap2.put(groupBy.getName(), new Hfs(new TextLine(), "output/path"));
        assertEquals("not equal: steps.size()", 3, getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy}).getFlowSteps().size());
    }

    @Test
    public void testDanglingHead() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("a", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        Pipe groupBy = new GroupBy("tail", Pipe.pipes(new Pipe[]{new GroupBy(new Pipe("a")), new GroupBy(new Pipe("b"))}), new Fields(new Comparable[]{"first", "second"}));
        hashMap2.put(groupBy.getName(), new Hfs(new TextLine(), "output/path"));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy});
            fail("did not catch missing source tap");
        } catch (Exception e) {
            fail("threw wrong exception");
        } catch (PlannerException e2) {
        }
    }

    @Test
    public void testDanglingTail() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        hashMap.put("a", hfs);
        hashMap.put("b", hfs);
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new GroupBy("tail", Pipe.pipes(new Pipe[]{new GroupBy(new Pipe("a")), new GroupBy(new Pipe("b"))}), new Fields(new Comparable[]{"first", "second"}))});
            fail("did not catch missing sink tap");
        } catch (Exception e) {
            fail("threw wrong exception");
        } catch (PlannerException e2) {
        }
    }

    @Test
    public void testExtraSource() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        hashMap.put("a", hfs);
        hashMap.put("b", hfs);
        hashMap.put("c", hfs);
        Pipe groupBy = new GroupBy("tail", Pipe.pipes(new Pipe[]{new GroupBy(new Pipe("a")), new GroupBy(new Pipe("b"))}), new Fields(new Comparable[]{"first", "second"}));
        hashMap2.put(groupBy.getName(), new Hfs(new TextLine(), "output/path"));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy});
            fail("did not catch extra source tap");
        } catch (Exception e) {
            fail("threw wrong exception");
        } catch (PlannerException e2) {
            assertTrue(e2.getMessage().contains("['c']"));
        }
    }

    @Test
    public void testExtraSink() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        hashMap.put("a", hfs);
        hashMap.put("b", hfs);
        Pipe groupBy = new GroupBy("tail", Pipe.pipes(new Pipe[]{new GroupBy(new Pipe("a")), new GroupBy(new Pipe("b"))}), new Fields(new Comparable[]{"first", "second"}));
        hashMap2.put(groupBy.getName(), new Hfs(new TextLine(), "output/path"));
        hashMap2.put("c", new Hfs(new TextLine(), "output/path"));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{groupBy});
            fail("did not catch extra sink tap");
        } catch (Exception e) {
            fail("threw wrong exception");
        } catch (PlannerException e2) {
            assertTrue(e2.getMessage().contains("['c']"));
        }
    }

    @Test
    public void testBuffer() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        hashMap2.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        List flowSteps = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Every(new GroupBy(new Pipe("count"), new Fields(new Comparable[]{1})), new Fields(new Comparable[]{1}), new TestBuffer(new Fields(new Comparable[]{"fourth"}), "value"), new Fields(new Comparable[]{0, 1}))}).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        assertEquals("not equal: step.sources.size()", 1, baseFlowStep.getSourceTaps().size());
        assertNotNull("null: step.groupBy", baseFlowStep.getGroup());
        assertNotNull("null: step.sink", baseFlowStep.getSink());
        assertEquals("not equal: mapDist", 1, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), (FlowElement) baseFlowStep.getSourceTaps().iterator().next(), baseFlowStep.getGroup()));
        assertEquals("not equal: reduceDist", 2, ElementGraphs.shortestDistance(baseFlowStep.getElementGraph(), baseFlowStep.getGroup(), baseFlowStep.getSink()));
    }

    @Test
    public void testBufferFail() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        hashMap2.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Every(new Every(new GroupBy(new Pipe("count"), new Fields(new Comparable[]{1})), new Fields(new Comparable[]{1}), new TestBuffer(new Fields(new Comparable[]{"fourth"}), "value"), new Fields(new Comparable[]{0, 1})), new Fields(new Comparable[]{1}), new Count(), new Fields(new Comparable[]{0, 1}))});
            fail("did not throw planner exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testBufferFail2() throws IOException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        hashMap2.put("count", new Hfs(new TextLine(new Fields(new Comparable[]{0, 1})), "output/path"));
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Every(new Every(new GroupBy(new Pipe("count"), new Fields(new Comparable[]{1})), new Fields(new Comparable[]{1}), new Count(), new Fields(new Comparable[]{0, 1})), new Fields(new Comparable[]{1}), new TestBuffer(new Fields(new Comparable[]{"fourth"}), "value"), new Fields(new Comparable[]{0, 1}))});
            fail("did not throw planner exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testErrorMessages() throws Exception {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"num"})), "foo");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"num"})), "bar");
        HashMap hashMap = new HashMap();
        hashMap.put("source20", hfs2);
        hashMap.put("source101", hfs);
        hashMap.put("source102", hfs);
        Hfs hfs3 = new Hfs(new TextLine(), "baz", SinkMode.REPLACE);
        Pipe pipe = new Pipe("source20");
        Pipe pipe2 = new Pipe("source101");
        try {
            getPlatform().getFlowConnector().connect(hashMap, hfs3, new CoGroup(new CoGroup(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"})), new Fields(new Comparable[]{"num9"}), new Pipe("source102"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"})));
            fail("did not fail on bad field");
        } catch (Exception e) {
            assertTrue("missing message", e.getMessage().contains("BuildJobsHadoopPlatformTest.testErrorMessages"));
        }
    }

    @Test
    public void testSplitInMiddleBeforePipeOptimized() throws Exception {
        splitMiddle(true, true);
    }

    @Test
    public void testSplitInMiddleBeforePipe() throws Exception {
        splitMiddle(true, false);
    }

    @Test
    public void testSplitInMiddleAfterPipe() throws Exception {
        splitMiddle(false, false);
    }

    private void splitMiddle(boolean z, boolean z2) {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "lower");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "upper");
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        Hfs hfs3 = new Hfs(z2 ? new SequenceFile(new Fields(new Comparable[]{"num", "lower", "num2", "upper"})) : new TextLine(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"lower"})), "/splitmiddle/left", SinkMode.REPLACE);
        Hfs hfs4 = new Hfs(z2 ? new SequenceFile(new Fields(new Comparable[]{"lower"})) : new TextLine(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"lower"})), "/splitmiddle/right", SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", hfs3);
        hashMap2.put("right", hfs4);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe each = new Each(new CoGroup("both", new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num", "lower", "num2", "upper"})), new Fields(new Comparable[]{"num"}), new RegexFilter(".*"));
        if (z) {
            each = new Pipe("left", each);
        }
        Pipe each2 = new Each(each, new Fields(new Comparable[]{"num"}), new RegexFilter(".*"));
        if (!z) {
            each2 = new Pipe("left", each2);
        }
        Pipe pipe = each2;
        if (z) {
            pipe = new Pipe("right", pipe);
        }
        Pipe each3 = new Each(pipe, new Fields(new Comparable[]{"num"}), new RegexFilter(".*"));
        if (!z) {
            each3 = new Pipe("right", each3);
        }
        FlowConnector flowConnector = getPlatform().getFlowConnector();
        List flowSteps = flowConnector.connect("splitmiddle", hashMap, hashMap2, new Pipe[]{each2, each3}).getFlowSteps();
        if (z2 && flowConnector.getRuleRegistrySet().findRegistryWith("CombineAdjacentTapTransformer") == null) {
            z2 = false;
        }
        assertEquals("not equal: steps.size()", z2 ? 2 : 3, flowSteps.size());
        BaseFlowStep baseFlowStep = (BaseFlowStep) flowSteps.get(0);
        FlowElement nextFlowElement = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(baseFlowStep.getGroup()));
        assertTrue("not an Each", nextFlowElement instanceof Each);
        FlowElement nextFlowElement2 = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(nextFlowElement));
        assertTrue("not a Each", nextFlowElement2 instanceof Each);
        FlowElement nextFlowElement3 = baseFlowStep.getNextFlowElement(baseFlowStep.getNextScope(nextFlowElement2));
        if (z2) {
            assertEquals("not proper sink", hfs3, nextFlowElement3);
        } else {
            assertTrue("not a TempHfs", nextFlowElement3 instanceof TempHfs);
        }
    }

    @Test
    public void testSourceIsSink() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        try {
            getPlatform().getFlowConnector().connect(hfs, hfs, new Pipe("left"));
            fail("did not throw planner exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testReplaceFail() throws Exception {
        try {
            getPlatform().getFlowConnector().connect(new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo"), new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"offset", "line2"})), "bar", SinkMode.REPLACE), new Each(new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{0}), "^[^ ]*"), Fields.REPLACE), new Fields(new Comparable[]{"line"}), new Identity(Fields.ARGS), Fields.REPLACE), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line2"})), Fields.REPLACE));
            fail("did not fail");
        } catch (Exception e) {
        }
    }

    @Test
    public void testNestedProperties() throws IOException {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"line"})), "/input");
        Each each = new Each(new Pipe("test"), new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third"}), "\\s"), Fields.ALL);
        Hfs hfs2 = new Hfs(new TextLine(), "output", SinkMode.REPLACE);
        Properties properties = new Properties();
        properties.setProperty("test.key", "test.value");
        HadoopFlow connect = getPlatform().getFlowConnector(new Properties(properties)).connect(hfs, hfs2, each);
        assertEquals("test flow", "test.value", connect.getProperty("test.key"));
        assertEquals("test step", "test.value", ((HadoopFlowStep) connect.getFlowSteps().get(0)).createInitializedConfig(connect.getFlowProcess(), connect.getConfig()).get("test.key"));
    }

    @Test
    public void testEveryAfterJoin() {
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge2");
        Hfs hfs3 = new Hfs(new TextLine(), "foo");
        Pipe every = new Every(new HashJoin("join", Pipe.pipes(new Pipe[]{new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), new RegexFilter(".*192.*"))}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"offset"}), new Fields(new Comparable[]{"offset"})}), Fields.size(4), new InnerJoin()), new MaxValue());
        HashMap hashMap = new HashMap();
        hashMap.put("left", hfs);
        hashMap.put("right", hfs2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("join", hfs3);
        try {
            getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{every});
            fail();
        } catch (Exception e) {
        }
    }

    @Ignore
    public void testManyJoins() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Pipe[] pipeArr = new Pipe[50];
        int i = 0;
        for (int i2 = 0; i2 < 50; i2++) {
            String str = "in" + i2;
            String str2 = "out" + i2;
            Pipe pipe = new Pipe(str);
            hashMap.put(str, new Hfs(new TextLine(new Fields(new Comparable[]{"key" + i2})), "foo/in" + i2));
            hashMap2.put(str2, new Hfs(new TextLine(), "foo/out" + i2));
            int i3 = i + 2;
            if (i2 > 0) {
                pipe = new Every(new CoGroup(pipeArr[i2 - 1], new Fields(new Comparable[]{"key" + (i2 - 1)}), pipe, new Fields(new Comparable[]{"key" + i2})), new Fields(new Comparable[]{"key" + (i2 - 1)}), new Sum());
                i3 += 2;
            }
            pipeArr[i2] = new Pipe(str2, pipe);
            i = i3 + 1;
        }
        long currentTimeMillis = System.currentTimeMillis();
        getPlatform().getFlowConnector().connect(hashMap, hashMap2, pipeArr);
        System.out.printf("n = %d: elements: %d: %.03f seconds\n", 50, Integer.valueOf(i), Double.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000.0d));
    }
}
