/*
 * Decompiled with CFR 0.152.
 */
package cascading;

import cascading.PlatformTestCase;
import cascading.TestFunction;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.operation.Aggregator;
import cascading.operation.Assertion;
import cascading.operation.AssertionLevel;
import cascading.operation.Debug;
import cascading.operation.DebugLevel;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.Insert;
import cascading.operation.aggregator.Sum;
import cascading.operation.assertion.AssertSizeMoreThan;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.filter.And;
import cascading.operation.filter.Not;
import cascading.operation.filter.Or;
import cascading.operation.filter.Xor;
import cascading.operation.function.UnGroup;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import data.InputData;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.junit.Test;

public class RegressionPipesPlatformTest
extends PlatformTestCase {
    public RegressionPipesPlatformTest() {
        super(false);
    }

    @Test
    public void testUnknown() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("unknown"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(Fields.UNKNOWN));
        pipe = new Each(pipe, (Filter)new Debug());
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(2)}), (Function)new Identity(new Fields(new Comparable[]{"label"})));
        pipe = new Each(pipe, (Filter)new Debug());
        pipe = new Each(pipe, new Fields(new Comparable[]{"label"}), (Filter)new RegexFilter("[A-Z]*"));
        pipe = new Each(pipe, (Filter)new Debug());
        Map<Object, Object> properties = this.getPlatform().getProperties();
        FlowConnectorProps.setDebugLevel(properties, (DebugLevel)DebugLevel.NONE);
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
    }

    @Test
    public void testCopy() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("copy"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
    }

    @Test
    public void testVarWidth() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCritics);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileCritics);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("varwidth"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(Fields.UNKNOWN));
        pipe = new Each(pipe, AssertionLevel.STRICT, (Assertion)new AssertSizeMoreThan(3));
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(-1)}), (Function)new Identity(new Fields(new Comparable[]{"name", "second", "last"})));
        pipe = new Each(pipe, (Filter)new Debug());
        Map<Object, Object> properties = this.getPlatform().getProperties();
        FlowConnectorProps.setDebugLevel(properties, (DebugLevel)DebugLevel.NONE);
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)7);
    }

    @Test
    public void testUnGroupUnknown() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("ungrouped-unknown-nondeterministic"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Function)new RegexSplitter("\t"), Fields.ALL);
        pipe = new Each(pipe, (Function)new UnGroup(Fields.size((int)2), new Fields(new Comparable[]{Integer.valueOf(0)}), Fields.fields((Fields[])new Fields[]{new Fields(new Comparable[]{Integer.valueOf(1)}), new Fields(new Comparable[]{Integer.valueOf(2)})})));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testDupeHeadNames() throws Exception {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("unknown"), SinkMode.REPLACE);
        Pipe lhs = new Pipe("test");
        lhs = new Each(lhs, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(" "));
        Pipe rhs = new Pipe("test");
        rhs = new Each(rhs, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(" "));
        GroupBy group = new GroupBy(Pipe.pipes((Pipe[])new Pipe[]{lhs, rhs}), Fields.size((int)3));
        try {
            this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)group);
            RegressionPipesPlatformTest.fail((String)"did not fail on dupe head names");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testDupeTailNames() throws Exception {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("unknown"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(" "));
        GroupBy group = new GroupBy(pipe, Fields.size((int)3));
        Pipe lhs = new Pipe("tail", (Pipe)group);
        lhs = new Each((Pipe)group, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(" "));
        Pipe rhs = new Pipe("tail", (Pipe)group);
        rhs = new Each((Pipe)group, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(" "));
        Map sinks = Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhs, rhs}), (Tap[])Tap.taps((Tap[])new Tap[]{sink, sink}));
        try {
            this.getPlatform().getFlowConnector().connect(source, sinks, Pipe.pipes((Pipe[])new Pipe[]{lhs, rhs}));
            RegressionPipesPlatformTest.fail((String)"did not fail on dupe head names");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testIllegalCharsInTempFiles() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("illegalchars"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("**&%&%bar:bar@foo://blah/\t(*(**^**&%&%^@#@&&() :::: ///\\\\ \t illegal chars in it");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter(" "));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(0)}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(0)}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)5);
    }

    @Test
    public void testCoGroupSplitPipe() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileLower);
        Tap splitTap = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"num", "char"}), this.getOutputPath("intermediate"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each split = new Each("split", (Function)splitter);
        Flow splitFlow = this.getPlatform().getFlowConnector().connect(source, splitTap, (Pipe)split);
        splitFlow.complete();
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroupsplit"), SinkMode.REPLACE);
        Pipe lower = new Pipe("lower");
        Pipe lhs = new Pipe("lhs", lower);
        Pipe rhs = new Pipe("rhs", lower);
        rhs = new Each(rhs, (Filter)new Debug("rhs-pre", true));
        rhs = new Each(rhs, new Fields(new Comparable[]{"num"}), (Function)new Identity(new Fields(new Comparable[]{"num2"})));
        rhs = new Each(rhs, (Filter)new Debug("rhs-post", true));
        CoGroup cogroup = new CoGroup(lhs, new Fields(new Comparable[]{"num"}), rhs, new Fields(new Comparable[]{"num2"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(splitTap, sink, (Pipe)cogroup);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)5, null);
        List results = RegressionPipesPlatformTest.getSinkAsList((Flow)flow);
        RegressionPipesPlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"1\ta\t1"})));
        RegressionPipesPlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"2\tb\t2"})));
    }

    @Test
    public void testGroupBySplitPipe() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileLower);
        Tap splitTap = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"num", "char"}), this.getOutputPath("splitintermediate"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each split = new Each("split", (Function)splitter);
        Flow splitFlow = this.getPlatform().getFlowConnector().connect(source, splitTap, (Pipe)split);
        splitFlow.complete();
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("groupbysplit"), SinkMode.REPLACE);
        Pipe lower = new Pipe("lower");
        Pipe lhs = new Pipe("lhs", lower);
        Pipe rhs = new Pipe("rhs", lower);
        rhs = new Each(rhs, new Fields(new Comparable[]{"num"}), (Function)new Identity(new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"}));
        GroupBy groupBy = new GroupBy(Pipe.pipes((Pipe[])new Pipe[]{lhs, rhs}), new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(splitTap, sink, (Pipe)groupBy);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)10, null);
        List results = RegressionPipesPlatformTest.getSinkAsList((Flow)flow);
        RegressionPipesPlatformTest.assertEquals((int)2, (int)Collections.frequency(results, new Tuple(new Object[]{"1\ta"})));
        RegressionPipesPlatformTest.assertEquals((int)2, (int)Collections.frequency(results, new Tuple(new Object[]{"2\tb"})));
    }

    @Test
    public void testLastEachNotModified() throws Exception {
        this.copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new TestFunction(new Fields(new Comparable[]{"insert"}), new Tuple(new Object[]{"inserted"})));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"insert"}));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("lasteachmodified"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)10, null);
    }

    @Test
    public void testComplexLogicAnd() throws Exception {
        this.copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Pipe pipe = new Pipe("test");
        Not filter = new Not((Filter)new And(new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter("1", true, true), new Fields(new Comparable[]{"char"}), (Filter)new RegexFilter("a", true, true)));
        pipe = new Each(pipe, (Filter)filter);
        pipe = new Each(pipe, new Fields(new Comparable[]{"num", "char"}), (Filter)filter);
        Tap sink = this.getPlatform().getDelimitedFile(Fields.ALL, " ", this.getOutputPath("/regression/complexlogicand"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)1, (int)2, (Pattern)Pattern.compile("1\ta"));
    }

    @Test
    public void testComplexLogicOr() throws Exception {
        this.copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Pipe pipe = new Pipe("test");
        Not filter = new Not((Filter)new Or(new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter("1", true, true), new Fields(new Comparable[]{"char"}), (Filter)new RegexFilter("a", true, true)));
        pipe = new Each(pipe, (Filter)filter);
        pipe = new Each(pipe, new Fields(new Comparable[]{"num", "char"}), (Filter)filter);
        Tap sink = this.getPlatform().getDelimitedFile(Fields.ALL, " ", this.getOutputPath("/regression/complexlogicor"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)4, (int)2, (Pattern)Pattern.compile("(1\t.)|(.\ta)"));
    }

    @Test
    public void testComplexLogicXor() throws Exception {
        this.copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Pipe pipe = new Pipe("test");
        Not filter = new Not((Filter)new Xor(new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter("1", true, true), new Fields(new Comparable[]{"char"}), (Filter)new RegexFilter("a", true, true)));
        pipe = new Each(pipe, (Filter)filter);
        pipe = new Each(pipe, new Fields(new Comparable[]{"num", "char"}), (Filter)filter);
        Tap sink = this.getPlatform().getDelimitedFile(Fields.ALL, " ", this.getOutputPath("/regression/complexlogicxor"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)3, (int)2, (Pattern)Pattern.compile("(1\t.)|(.\ta)"));
    }

    @Test
    public void testGroupNoneSortUnknown() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("none-unknown"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Function)new RegexSplitter("\t"));
        pipe = new GroupBy(pipe, Fields.NONE, Fields.FIRST);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)5);
    }

    @Test
    public void testDeepPipeline() throws Exception {
        int i;
        this.getPlatform().copyFromLocal(InputData.inputFileApache200);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileApache200);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("deeppipline"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("pipeline");
        ExpressionFunction function = new ExpressionFunction(new Fields(new Comparable[]{"count"}), "line.split( \"\\\\s\").length", String.class);
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)function, Fields.ALL);
        int depth = 50;
        for (int i2 = 0; i2 < depth; ++i2) {
            pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{Integer.valueOf(0)})), Fields.ALL);
            pipe = new Each(pipe, new Fields(new Comparable[]{"count"}), (Function)new Identity(new Fields(new Comparable[]{Integer.valueOf(0)})), Fields.ALL);
            pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(), Fields.REPLACE);
            pipe = new Each(pipe, new Fields(new Comparable[]{"count"}), (Function)new Identity(), Fields.REPLACE);
            pipe = new Each(pipe, new Fields(new Comparable[]{"line", "count"}), (Function)new Identity());
            pipe = new Each(pipe, new Fields(new Comparable[]{"line", "count"}), (Function)new Identity(new Fields(new Comparable[]{"line2", "count2"})), new Fields(new Comparable[]{"line", "count2"}));
            pipe = new Each(pipe, new Fields(new Comparable[]{"count2"}), (Function)new Identity(new Fields(new Comparable[]{"count"})), new Fields(new Comparable[]{"line", "count"}));
        }
        int modulo = 1000000;
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new ExpressionFunction(new Fields(new Comparable[]{"hash"}), "line.hashCode() % " + modulo, String.class), Fields.ALL);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"hash"}));
        for (i = 0; i < depth; ++i) {
            pipe = new Every(pipe, new Fields(new Comparable[]{"count"}), (Aggregator)new Sum(new Fields(new Comparable[]{"sum" + (i + 1)})));
        }
        for (i = 0; i < depth; ++i) {
            pipe = new Each(pipe, new Fields(new Comparable[]{"hash"}), (Function)new Identity(new Fields(new Comparable[]{Integer.valueOf(0)})), Fields.ALL);
            pipe = new Each(pipe, new Fields(new Comparable[]{"sum1"}), (Function)new Identity(new Fields(new Comparable[]{Integer.valueOf(0)})), Fields.ALL);
            pipe = new Each(pipe, new Fields(new Comparable[]{"hash", "sum1"}), (Function)new Identity(), Fields.SWAP);
        }
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.validateLength((Flow)flow, (int)200);
    }

    @Test
    public void testUnknownReplace() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap source = this.getPlatform().getDelimitedFile(Fields.UNKNOWN, "-", InputData.inputFileJoined);
        Tap sink = this.getPlatform().getTabDelimitedFile(Fields.UNKNOWN, this.getOutputPath("unknown-replace"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(0)}), (Function)new Insert(Fields.ARGS, new Object[]{"value"}), Fields.REPLACE);
        pipe = new Each(pipe, (Filter)new Debug());
        Map<Object, Object> properties = this.getPlatform().getProperties();
        FlowConnectorProps.setDebugLevel(properties, (DebugLevel)DebugLevel.NONE);
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(source, sink, pipe);
        flow.complete();
        RegressionPipesPlatformTest.assertEquals((int)5, (int)RegressionPipesPlatformTest.asList((Flow)flow, (Tap)sink).size());
        Set results = RegressionPipesPlatformTest.asSet((Flow)flow, (Tap)sink);
        RegressionPipesPlatformTest.assertEquals((int)1, (int)results.size());
        RegressionPipesPlatformTest.assertEquals((Object)new Tuple(new Object[]{"value"}), results.iterator().next());
    }

    @Test
    public void testOOMEPreGroup() throws Exception {
        block2: {
            this.copyFromLocal(InputData.inputFileApache);
            Tap source = this.getPlatform().getTextFile(InputData.inputFileApache);
            Pipe pipe = new Pipe("test");
            pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new TestFunction(new Fields(new Comparable[]{"insert"}), null){

                protected void throwIntentionalException() {
                    throw new OutOfMemoryError("fake error");
                }
            });
            Tap sink = this.getPlatform().getTextFile(this.getOutputPath("oomepre"), SinkMode.REPLACE);
            Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
            try {
                flow.complete();
                RegressionPipesPlatformTest.fail((String)"no failure thrown");
            }
            catch (Throwable exception) {
                System.out.println("exception = " + exception);
                exception.printStackTrace();
                if (this.getPlatform().isMapReduce() || this.getPlatform().isDAG()) break block2;
                RegressionPipesPlatformTest.assertTrue((boolean)(exception instanceof OutOfMemoryError));
            }
        }
    }

    @Test
    public void testOOMEPostGroup() throws Exception {
        block2: {
            this.getPlatform().copyFromLocal(InputData.inputFileLower);
            this.getPlatform().copyFromLocal(InputData.inputFileUpper);
            Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
            Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
            HashMap<String, Tap> sources = new HashMap<String, Tap>();
            sources.put("lower", sourceLower);
            sources.put("upper", sourceUpper);
            Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("oomepost"), SinkMode.REPLACE);
            RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
            Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
            Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
            CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
            splice = new Each((Pipe)splice, Fields.NONE, (Function)new TestFunction(new Fields(new Comparable[]{"insert"}), null){

                protected void throwIntentionalException() {
                    throw new OutOfMemoryError("fake error");
                }
            });
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)splice);
            try {
                flow.complete();
                RegressionPipesPlatformTest.fail((String)"no failure thrown");
            }
            catch (Throwable exception) {
                System.out.println("exception = " + exception);
                exception.printStackTrace();
                if (this.getPlatform().isMapReduce() || this.getPlatform().isDAG()) break block2;
                RegressionPipesPlatformTest.assertTrue((boolean)(exception instanceof OutOfMemoryError));
            }
        }
    }
}

