/*
 * Decompiled with CFR 0.152.
 */
package cascading.pipe.assembly;

import cascading.PlatformTestCase;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.Assertion;
import cascading.operation.AssertionLevel;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.operation.assertion.AssertExpression;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.AggregateBy;
import cascading.pipe.assembly.AggregateByLocally;
import cascading.pipe.assembly.AverageBy;
import cascading.pipe.assembly.AverageByLocally;
import cascading.pipe.assembly.Coerce;
import cascading.pipe.assembly.Copy;
import cascading.pipe.assembly.CountBy;
import cascading.pipe.assembly.CountByLocally;
import cascading.pipe.assembly.Discard;
import cascading.pipe.assembly.FirstBy;
import cascading.pipe.assembly.MaxBy;
import cascading.pipe.assembly.MaxByLocally;
import cascading.pipe.assembly.MinBy;
import cascading.pipe.assembly.MinByLocally;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.assembly.SumBy;
import cascading.pipe.assembly.SumByLocally;
import cascading.pipe.assembly.Unique;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.junit.Test;

public class AssemblyHelpersPlatformTest
extends PlatformTestCase {
    @Test
    public void testCoerce() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"num", "char"}), this.getOutputPath("coerce"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("coerce");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Coerce(pipe, new Fields(new Comparable[]{"num"}), new Class[]{Integer.class});
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testCoerceFields() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"num", "char"}), this.getOutputPath("coercefields"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("coerce");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{String.class, String.class}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Coerce(pipe, new Fields(new Comparable[]{"num"}).applyTypes(new Type[]{Integer.class}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"num"}), AssertionLevel.STRICT, (Assertion)new AssertExpression("num instanceof Integer", Object.class));
        pipe = new Coerce(pipe, new Fields(new Comparable[]{"num"}).applyTypes(new Type[]{String.class}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"num"}), AssertionLevel.STRICT, (Assertion)new AssertExpression("num instanceof String", Object.class));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testRetainNarrow() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), this.getOutputPath("retainnarrow"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Retain(pipe, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+$"));
    }

    @Test
    public void testCopyNamed() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"num", "char", "item", "element"}), this.getOutputPath("copy"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Copy(pipe, new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"item", "element"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+\\s\\d+\\s\\w+$"));
    }

    @Test
    public void testDiscardNarrow() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), this.getOutputPath("discardnarrow"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Discard(pipe, new Fields(new Comparable[]{"char"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+$"));
    }

    @Test
    public void testRenameNamed() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"item", "element"}), this.getOutputPath("rename"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Rename(pipe, new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"item", "element"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testRenameAll() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"item", "element"}), this.getOutputPath("renameall"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Rename(pipe, Fields.ALL, new Fields(new Comparable[]{"item", "element"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testRenameNarrow() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"item"}), new Fields(new Comparable[]{"char", "item"}), this.getOutputPath("renamenarrow"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Rename(pipe, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"item"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
    }

    @Test
    public void testUnique() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileLhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"item"}), new Fields(new Comparable[]{"num", "char"}), this.getOutputPath("unique"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("shape");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipe = new Unique(pipe, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testUniqueMerge() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap sourceLhs = this.getPlatform().getTextFile(InputData.inputFileLhs);
        Tap sourceRhs = this.getPlatform().getTextFile(InputData.inputFileRhs);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"item"}), new Fields(new Comparable[]{"num", "char"}), this.getOutputPath("uniquemerge-nondeterministic"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe lhsPipe = new Pipe("lhs");
        lhsPipe = new Each(lhsPipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        Pipe rhsPipe = new Pipe("rhs");
        rhsPipe = new Each(rhsPipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        Unique pipe = new Unique(Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), new Fields(new Comparable[]{"num"}));
        Map sources = Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), (Tap[])Tap.taps((Tap[])new Tap[]{sourceLhs, sourceRhs}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sink, (Pipe)pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)1, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testCount() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("count"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        pipe = new CountBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 2}), new Tuple(new Object[]{"b", 4}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 1})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testCountAll() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"count"}), "\t", new Class[]{Integer.TYPE}, this.getOutputPath("countall"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        CountBy countBy = new CountBy(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}));
        pipe = new AggregateBy(pipe, Fields.NONE, 2, new AggregateBy[]{countBy});
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)1, (int)1, (Pattern)Pattern.compile("^\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{13})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testCountNullNotNull() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"notnull", "null"}), "\t", new Class[]{Integer.TYPE, Integer.TYPE}, this.getOutputPath("countnullnotnull"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        ExpressionFunction function = new ExpressionFunction(Fields.ARGS, "\"c\".equals($0) ? null : $0", String.class);
        pipe = new Each(pipe, new Fields(new Comparable[]{"char"}), (Function)function, Fields.REPLACE);
        CountBy countNotNull = new CountBy(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"notnull"}), CountBy.Include.NO_NULLS);
        CountBy countNull = new CountBy(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"null"}), CountBy.Include.ONLY_NULLS);
        pipe = new AggregateBy(pipe, Fields.NONE, 2, new AggregateBy[]{countNotNull, countNull});
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)1, (int)2, (Pattern)Pattern.compile("^\\d+\t\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{9, 4})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testCountMerge() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap lhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap rhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("mergecount"), SinkMode.REPLACE);
        Pipe lhsPipe = new Pipe("count-lhs");
        Pipe rhsPipe = new Pipe("count-rhs");
        rhsPipe = new Each(rhsPipe, new Fields(new Comparable[]{"char"}), (Function)new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        CountBy countPipe = new CountBy(Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Map tapMap = Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), (Tap[])Tap.taps((Tap[])new Tap[]{lhs, rhs}));
        Flow flow = this.getPlatform().getFlowConnector().connect(tapMap, sink, (Pipe)countPipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 4}), new Tuple(new Object[]{"b", 8}), new Tuple(new Object[]{"c", 8}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 2})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testSumBy() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("sum"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("sum");
        pipe = new SumBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Long.TYPE, 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 6}), new Tuple(new Object[]{"b", 12}), new Tuple(new Object[]{"c", 10}), new Tuple(new Object[]{"d", 6}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testSumByNulls() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.class}, this.getOutputPath("sumnulls"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("sum");
        ExpressionFunction function = new ExpressionFunction(Fields.ARGS, "5 == $0 ? null : $0", Integer.class);
        pipe = new Each(pipe, new Fields(new Comparable[]{"num"}), (Function)function, Fields.REPLACE);
        pipe = new SumBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Integer.class, 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s(\\d+|null)$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 7}), new Tuple(new Object[]{"c", 10}), new Tuple(new Object[]{"d", 6}), new Tuple(new Object[]{"e", null})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testSumMerge() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap lhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap rhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("mergesum"), SinkMode.REPLACE);
        Pipe lhsPipe = new Pipe("sum-lhs");
        Pipe rhsPipe = new Pipe("sum-rhs");
        rhsPipe = new Each(rhsPipe, new Fields(new Comparable[]{"char"}), (Function)new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        SumBy sumPipe = new SumBy(Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Long.TYPE, 2);
        Map tapMap = Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), (Tap[])Tap.taps((Tap[])new Tap[]{lhs, rhs}));
        Flow flow = this.getPlatform().getFlowConnector().connect(tapMap, sink, (Pipe)sumPipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 12}), new Tuple(new Object[]{"b", 24}), new Tuple(new Object[]{"c", 20}), new Tuple(new Object[]{"d", 12}), new Tuple(new Object[]{"e", 10})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testAverageBy() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, this.getOutputPath("average"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("average");
        pipe = new AverageBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s[\\d.]+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 3.0}), new Tuple(new Object[]{"b", 3.0}), new Tuple(new Object[]{"c", 2.5}), new Tuple(new Object[]{"d", 3.0}), new Tuple(new Object[]{"e", 5.0})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testAverageByNull() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, this.getOutputPath("averagenull"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("average");
        ExpressionFunction function = new ExpressionFunction(Fields.ARGS, "3 == $0 ? null : $0", Integer.class);
        pipe = new Each(pipe, new Fields(new Comparable[]{"num"}), (Function)function, Fields.REPLACE);
        pipe = new AverageBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), AverageBy.Include.NO_NULLS, 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s[\\d.]+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 3.0}), new Tuple(new Object[]{"b", 3.0}), new Tuple(new Object[]{"c", 2.3333333333333335}), new Tuple(new Object[]{"d", 3.0}), new Tuple(new Object[]{"e", 5.0})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testAverageMerge() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap lhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap rhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, this.getOutputPath("mergeaverage"), SinkMode.REPLACE);
        Pipe lhsPipe = new Pipe("average-lhs");
        Pipe rhsPipe = new Pipe("average-rhs");
        rhsPipe = new Each(rhsPipe, new Fields(new Comparable[]{"char"}), (Function)new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        AverageBy sumPipe = new AverageBy(Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), 2);
        Map tapMap = Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), (Tap[])Tap.taps((Tap[])new Tap[]{lhs, rhs}));
        Flow flow = this.getPlatform().getFlowConnector().connect(tapMap, sink, (Pipe)sumPipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s[\\d.]+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 3.0}), new Tuple(new Object[]{"b", 3.0}), new Tuple(new Object[]{"c", 2.5}), new Tuple(new Object[]{"d", 3.0}), new Tuple(new Object[]{"e", 5.0})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testFirstBy() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileCross);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), " ", InputData.inputFileCross);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), "\t", new Class[]{Integer.TYPE, String.class, String.class}, this.getOutputPath("firstnfields"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("first");
        Fields charFields = new Fields(new Comparable[]{"lower", "upper"});
        charFields.setComparator((Comparable)((Object)"lower"), Collections.reverseOrder());
        pipe = new FirstBy(pipe, new Fields(new Comparable[]{"num"}), charFields, 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{1, "c", "A"}), new Tuple(new Object[]{2, "d", "B"}), new Tuple(new Object[]{3, "c", "C"}), new Tuple(new Object[]{4, "d", "B"}), new Tuple(new Object[]{5, "e", "A"})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        AssemblyHelpersPlatformTest.assertTrue((!iterator.hasNext() ? 1 : 0) != 0);
        iterator.close();
    }

    @Test
    public void testFirstByWithoutComparator() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileCrossRev);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), " ", InputData.inputFileCrossRev);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), "\t", new Class[]{Integer.TYPE, String.class, String.class}, this.getOutputPath("firstnfieldswithoutcomparator"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("first");
        Fields charFields = new Fields(new Comparable[]{"lower", "upper"});
        pipe = new FirstBy(pipe, new Fields(new Comparable[]{"num"}), charFields, 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{1, "c", "C"}), new Tuple(new Object[]{2, "d", "D"}), new Tuple(new Object[]{3, "c", "C"}), new Tuple(new Object[]{4, "d", "D"}), new Tuple(new Object[]{5, "e", "E"})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        AssemblyHelpersPlatformTest.assertTrue((!iterator.hasNext() ? 1 : 0) != 0);
        iterator.close();
    }

    @Test
    public void testParallelAggregates() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum", "count", "average", "average2", "first"}), "\t", new Class[]{String.class, Integer.TYPE, Integer.TYPE, Double.TYPE, Double.TYPE, Integer.TYPE}, this.getOutputPath("multi"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("multi");
        Fields num = new Fields(new Comparable[]{"num"});
        num.setComparator((Comparable)((Object)"num"), Collections.reverseOrder());
        SumBy sumPipe = new SumBy(num, new Fields(new Comparable[]{"sum"}), Long.TYPE);
        CountBy countPipe = new CountBy(new Fields(new Comparable[]{"count"}));
        AverageBy averagePipe = new AverageBy(num, new Fields(new Comparable[]{"average"}));
        AverageBy averagePipe2 = new AverageBy(num, new Fields(new Comparable[]{"average2"}));
        FirstBy firstBy = new FirstBy(num, new Fields(new Comparable[]{"first"}));
        pipe = new AggregateBy("name", Pipe.pipes((Pipe[])new Pipe[]{pipe}), new Fields(new Comparable[]{"char"}), 2, new AggregateBy[]{sumPipe, countPipe, averagePipe, averagePipe2, firstBy});
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)6, (Pattern)Pattern.compile("^\\w+\\s\\d+\\s\\d+\\s[\\d.]+\\s[\\d.]+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 6, 2, 3.0, 3.0, 5}), new Tuple(new Object[]{"b", 12, 4, 3.0, 3.0, 5}), new Tuple(new Object[]{"c", 10, 4, 2.5, 2.5, 4}), new Tuple(new Object[]{"d", 6, 2, 3.0, 3.0, 4}), new Tuple(new Object[]{"e", 5, 1, 5.0, 5.0, 5})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testParallelAggregatesMergeLegacyHash() throws IOException {
        Map<Object, Object> properties = this.getProperties();
        properties.put("cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash", "true");
        this.performParallelAggregatesMerge(false, properties);
    }

    @Test
    public void testParallelAggregatesPriorMergeLegacyHash() throws IOException {
        Map<Object, Object> properties = this.getProperties();
        properties.put("cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash", "true");
        this.performParallelAggregatesMerge(true, properties);
    }

    @Test
    public void testParallelAggregatesMerge() throws IOException {
        this.performParallelAggregatesMerge(false, this.getProperties());
    }

    @Test
    public void testParallelAggregatesPriorMerge() throws IOException {
        this.performParallelAggregatesMerge(true, this.getProperties());
    }

    private void performParallelAggregatesMerge(boolean priorMerge, Map<Object, Object> properties) throws IOException {
        AggregateBy pipe;
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        this.getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap lhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap rhs = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum", "count", "average"}), "\t", new Class[]{String.class, Integer.TYPE, Integer.TYPE, Double.TYPE}, this.getOutputPath("multimerge+" + priorMerge + String.valueOf(properties.size())), SinkMode.REPLACE);
        Pipe lhsPipe = new Pipe("multi-lhs");
        Pipe rhsPipe = new Pipe("multi-rhs");
        rhsPipe = new Each(rhsPipe, new Fields(new Comparable[]{"char"}), (Function)new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        Fields sumFields = new Fields(new Comparable[]{"sum"});
        class CustomHasher
        implements Hasher<Object>,
        Comparator<Comparable>,
        Serializable {
            CustomHasher() {
            }

            public int hashCode(Object value) {
                if (value == null) {
                    return 0;
                }
                long offset = 2166136261L;
                long prime = 16777619L;
                long hash = offset;
                for (byte b : value.toString().getBytes()) {
                    hash = (hash ^ (long)b) * prime;
                }
                return (int)hash;
            }

            @Override
            public int compare(Comparable o1, Comparable o2) {
                return o1.compareTo(o2);
            }
        }
        sumFields.setComparator((Comparable)((Object)"sum"), (Comparator)new CustomHasher());
        SumBy sumPipe = new SumBy(new Fields(new Comparable[]{"num"}), sumFields, Long.TYPE);
        Fields countFields = new Fields(new Comparable[]{"count"});
        countFields.setComparator((Comparable)((Object)"count"), (Comparator)new CustomHasher());
        CountBy countPipe = new CountBy(countFields);
        Fields averageFields = new Fields(new Comparable[]{"average"});
        averageFields.setComparator((Comparable)((Object)"average"), (Comparator)new CustomHasher());
        AverageBy averagePipe = new AverageBy(new Fields(new Comparable[]{"num"}), averageFields);
        Fields charFields = new Fields(new Comparable[]{"char"});
        charFields.setComparator((Comparable)((Object)"char"), (Comparator)new CustomHasher());
        if (priorMerge) {
            Merge merge = new Merge(Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}));
            pipe = new AggregateBy("name", (Pipe)merge, charFields, 2, new AggregateBy[]{sumPipe, countPipe, averagePipe});
        } else {
            pipe = new AggregateBy("name", Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), charFields, 2, new AggregateBy[]{sumPipe, countPipe, averagePipe});
        }
        Map tapMap = Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), (Tap[])Tap.taps((Tap[])new Tap[]{lhs, rhs}));
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(tapMap, sink, (Pipe)pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)4, (Pattern)Pattern.compile("^\\w+\\s\\d+\\s\\d+\\s[\\d.]+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 12, 4, 3.0}), new Tuple(new Object[]{"b", 24, 8, 3.0}), new Tuple(new Object[]{"c", 20, 8, 2.5}), new Tuple(new Object[]{"d", 12, 4, 3.0}), new Tuple(new Object[]{"e", 10, 2, 5.0})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testCountCount() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"count", "count2"}), "\t", new Class[]{Integer.TYPE, Integer.TYPE}, this.getOutputPath("countcount"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        pipe = new CountBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        pipe = new CountBy(pipe, new Fields(new Comparable[]{"count"}), new Fields(new Comparable[]{"count2"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)3, (int)2, (Pattern)Pattern.compile("^\\d+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{1, 1}), new Tuple(new Object[]{2, 2}), new Tuple(new Object[]{4, 2})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testSameSourceMerge() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("samesourcemergecount"), SinkMode.REPLACE);
        Pipe sourcePipe = new Pipe("source");
        Pipe lhsPipe = new Pipe("count-lhs", sourcePipe);
        Pipe rhsPipe = new Pipe("count-rhs", sourcePipe);
        rhsPipe = new Each(rhsPipe, new Fields(new Comparable[]{"char"}), (Function)new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        Merge merge = new Merge("first", new Pipe[]{lhsPipe, rhsPipe});
        CountBy countPipe = new CountBy((Pipe)merge, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        lhsPipe = new Each(new Pipe("lhs", (Pipe)countPipe), (Function)new Identity());
        rhsPipe = new Each(new Pipe("rhs", (Pipe)countPipe), (Function)new Identity());
        merge = new Merge("second", new Pipe[]{lhsPipe, rhsPipe});
        countPipe = new CountBy((Pipe)merge, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Map tapMap = Cascades.tapsMap((Pipe)sourcePipe, source);
        Flow flow = this.getPlatform().getFlowConnector().connect(tapMap, sink, (Pipe)countPipe);
        List steps = flow.getFlowSteps();
        if (this.getPlatform().isMapReduce()) {
            AssemblyHelpersPlatformTest.assertEquals((String)"not equal: steps.size()", (int)2, (int)steps.size());
        }
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 2}), new Tuple(new Object[]{"b", 2}), new Tuple(new Object[]{"c", 2}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 2})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testSameSourceMergeThreeWay() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("samesourcemergethreeway"), SinkMode.REPLACE);
        Pipe sourcePipe = new Pipe("source");
        sourcePipe = new CountBy(sourcePipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Pipe lhsPipe = new Pipe("count-lhs", sourcePipe);
        lhsPipe = new CountBy(lhsPipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Pipe rhsPipe = new Pipe("count-rhs", sourcePipe);
        rhsPipe = new CountBy(rhsPipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        CoGroup groupPipe = new CoGroup(sourcePipe, new Fields(new Comparable[]{"char"}), rhsPipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"char", "num", "char2", "count"}));
        groupPipe = new CoGroup(lhsPipe, new Fields(new Comparable[]{"char"}), (Pipe)groupPipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"char", "count", "char2", "num2", "char3", "count3"}));
        Map tapMap = Cascades.tapsMap((Pipe)sourcePipe, source);
        Flow flow = this.getPlatform().getFlowConnector().connect(tapMap, sink, (Pipe)groupPipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 1}), new Tuple(new Object[]{"e", 1})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMinBy() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "min"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("minby"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("min");
        pipe = new MinBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"min"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMinByNullSafety() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "min"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("minbynullsafety"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("min");
        pipe = new Each(pipe, (Function)new NullInsert(3, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS);
        pipe = new MinBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"min"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMinByString() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "min"}), "\t", new Class[]{Integer.TYPE, String.class}, this.getOutputPath("minbystring"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("max");
        pipe = new MinBy(pipe, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"min"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{1, "a"}), new Tuple(new Object[]{2, "b"}), new Tuple(new Object[]{3, "c"}), new Tuple(new Object[]{4, "b"}), new Tuple(new Object[]{5, "a"})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMaxBy() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "max"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("maxby"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("max");
        pipe = new MaxBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"max"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 5}), new Tuple(new Object[]{"b", 5}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMaxByNullSafety() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "max"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("maxbynullsafety"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("max");
        pipe = new Each(pipe, (Function)new NullInsert(1, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS);
        pipe = new MaxBy(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"max"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{"a", 5}), new Tuple(new Object[]{"b", 5}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMaxByString() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "max"}), "\t", new Class[]{Integer.TYPE, String.class}, this.getOutputPath("maxbystring"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("max");
        pipe = new MaxBy(pipe, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"max"}), 2);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\d+\\s\\w+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{1, "c"}), new Tuple(new Object[]{2, "d"}), new Tuple(new Object[]{3, "c"}), new Tuple(new Object[]{4, "d"}), new Tuple(new Object[]{5, "e"})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testSumByLocally() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("sum"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("sum");
        pipe = new SumByLocally(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Long.TYPE, 500);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        HashSet results = new HashSet();
        Collections.addAll(results, new Tuple(new Object[]{"a", 6}), new Tuple(new Object[]{"b", 12}), new Tuple(new Object[]{"c", 10}), new Tuple(new Object[]{"d", 6}), new Tuple(new Object[]{"e", 5}));
        TupleEntryIterator iterator = flow.openSink();
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertTrue((boolean)results.contains(((TupleEntry)iterator.next()).getTuple()));
        }
        iterator.close();
    }

    @Test
    public void testCountByLocally() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("count"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        pipe = new CountByLocally(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 1000);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        HashSet results = new HashSet();
        Collections.addAll(results, new Tuple(new Object[]{"a", 2}), new Tuple(new Object[]{"b", 4}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 1}));
        TupleEntryIterator iterator = flow.openSink();
        boolean count = false;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertTrue((boolean)results.contains(((TupleEntry)iterator.next()).getTuple()));
        }
        iterator.close();
    }

    @Test
    public void testCountByLocallyAll() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"count"}), "\t", new Class[]{Integer.TYPE}, this.getOutputPath("countall"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        CountByLocally countBy = new CountByLocally(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}));
        pipe = new AggregateByLocally(pipe, Fields.NONE, 1000, new AggregateByLocally[]{countBy});
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)1, (int)1, (Pattern)Pattern.compile("^\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{13})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testCountByLocallyNullNotNull() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"notnull", "null"}), "\t", new Class[]{Integer.TYPE, Integer.TYPE}, this.getOutputPath("countnullnotnull"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count");
        ExpressionFunction function = new ExpressionFunction(Fields.ARGS, "\"c\".equals($0) ? null : $0", String.class);
        pipe = new Each(pipe, new Fields(new Comparable[]{"char"}), (Function)function, Fields.REPLACE);
        CountByLocally countNotNull = new CountByLocally(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"notnull"}), CountByLocally.Include.NO_NULLS);
        CountByLocally countNull = new CountByLocally(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"null"}), CountByLocally.Include.ONLY_NULLS);
        pipe = new AggregateByLocally(pipe, Fields.NONE, 1000, new AggregateByLocally[]{countNotNull, countNull});
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)1, (int)2, (Pattern)Pattern.compile("^\\d+\t\\d+$"));
        Tuple[] results = new Tuple[]{new Tuple(new Object[]{9, 4})};
        TupleEntryIterator iterator = flow.openSink();
        int count = 0;
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertEquals((Object)results[count++], (Object)((TupleEntry)iterator.next()).getTuple());
        }
        iterator.close();
    }

    @Test
    public void testMinByLocallyNullSafety() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "min"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("minbynullsafety"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("min");
        pipe = new Each(pipe, (Function)new NullInsert(3, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS);
        pipe = new MinByLocally(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"min"}), 1000);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        HashSet results = new HashSet();
        Collections.addAll(results, new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 5}));
        TupleEntryIterator iterator = flow.openSink();
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertTrue((boolean)results.remove(((TupleEntry)iterator.next()).getTuple()));
        }
        iterator.close();
    }

    @Test
    public void testMaxByLocallyNullSafety() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "max"}), "\t", new Class[]{String.class, Integer.TYPE}, this.getOutputPath("maxbylocallynullsafety"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("max");
        pipe = new Each(pipe, (Function)new NullInsert(1, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS);
        pipe = new MaxByLocally(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"max"}), 1000);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s\\d+$"));
        HashSet results = new HashSet();
        Collections.addAll(results, new Tuple(new Object[]{"a", 5}), new Tuple(new Object[]{"b", 5}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 5}));
        TupleEntryIterator iterator = flow.openSink();
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertTrue((boolean)results.remove(((TupleEntry)iterator.next()).getTuple()));
        }
        iterator.close();
    }

    @Test
    public void testAverageByLocally() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, this.getOutputPath("average"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("average");
        pipe = new AverageByLocally(pipe, new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), 1000);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        AssemblyHelpersPlatformTest.validateLength((Flow)flow, (int)5, (int)2, (Pattern)Pattern.compile("^\\w+\\s[\\d.]+$"));
        HashSet results = new HashSet();
        Collections.addAll(results, new Tuple(new Object[]{"a", 3.0}), new Tuple(new Object[]{"b", 3.0}), new Tuple(new Object[]{"c", 2.5}), new Tuple(new Object[]{"d", 3.0}), new Tuple(new Object[]{"e", 5.0}));
        TupleEntryIterator iterator = flow.openSink();
        while (iterator.hasNext()) {
            AssemblyHelpersPlatformTest.assertTrue((boolean)results.remove(((TupleEntry)iterator.next()).getTuple()));
        }
        iterator.close();
        AssemblyHelpersPlatformTest.assertTrue((boolean)results.isEmpty());
    }

    class NullInsert
    extends BaseOperation
    implements Function {
        private final int number;

        public NullInsert(int numberToFilter, Fields fieldDeclaration) {
            super(2, fieldDeclaration);
            this.number = numberToFilter;
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry argument = functionCall.getArguments();
            int num = argument.getInteger((Comparable)Integer.valueOf(0));
            String chr = argument.getString((Comparable)Integer.valueOf(1));
            Tuple result = num == this.number ? new Tuple(new Object[]{null, chr}) : new Tuple(new Object[]{num, chr});
            functionCall.getOutputCollector().add(result);
        }
    }
}

