/*
 * Decompiled with CFR 0.152.
 */
package cascading;

import cascading.PlatformTestCase;
import cascading.assembly.EuclideanDistance;
import cascading.assembly.PearsonDistance;
import cascading.assembly.SortElements;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.AggregatorCall;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.operation.aggregator.First;
import cascading.operation.aggregator.Sum;
import cascading.operation.function.UnGroup;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import data.InputData;
import java.io.Serializable;
import java.util.List;
import org.junit.Test;

public class DistanceUseCasePlatformTest
extends PlatformTestCase
implements Serializable {
    public DistanceUseCasePlatformTest() {
        super(false);
    }

    @Test
    public void testEuclideanDistance() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCritics);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileCritics);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("euclidean/long"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("euclidean");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter("\t"));
        pipe = new Each(pipe, (Function)new UnGroup(new Fields(new Comparable[]{"name", "movie", "rate"}), new Fields(new Comparable[]{Integer.valueOf(0)}), 2));
        pipe = new CoGroup(pipe, new Fields(new Comparable[]{"movie"}), 1, new Fields(new Comparable[]{"name1", "movie", "rate1", "name2", "movie2", "rate2"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"movie", "name1", "rate1", "name2", "rate2"}), (Function)new Identity());
        pipe = new Each(pipe, (Filter)new RegexFilter("^[^\\t]*\\t([^\\t]*)\\t[^\\t]*\\t\\1\\t.*", true));
        pipe = new Each(pipe, (Function)new SortElements(new Fields[]{new Fields(new Comparable[]{"name1", "rate1"}), new Fields(new Comparable[]{"name2", "rate2"})}));
        pipe = new GroupBy(pipe, Fields.ALL);
        pipe = new Every(pipe, Fields.ALL, (Aggregator)new First(), Fields.RESULTS);
        Identity sqDiff = new Identity(new Fields(new Comparable[]{"score"})){

            public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
                TupleEntry input = functionCall.getArguments();
                functionCall.getOutputCollector().add(new Tuple(new Object[]{Math.pow(input.getTuple().getDouble(0) - input.getTuple().getDouble(1), 2.0)}));
            }
        };
        pipe = new Each(pipe, new Fields(new Comparable[]{"rate1", "rate2"}), (Function)sqDiff, Fields.ALL);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"name1", "name2"}));
        Sum distance = new Sum(new Fields(new Comparable[]{"distance"})){

            public void complete(FlowProcess flowProcess, AggregatorCall aggregatorCall) {
                Tuple tuple = super.getResult(aggregatorCall);
                aggregatorCall.getOutputCollector().add(new Tuple(new Object[]{1.0 / (1.0 + tuple.getDouble(0))}));
            }
        };
        pipe = new Every(pipe, new Fields(new Comparable[]{"score"}), (Aggregator)distance, new Fields(new Comparable[]{"name1", "name2", "distance"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        DistanceUseCasePlatformTest.validateLength((Flow)flow, (int)21);
        List results = DistanceUseCasePlatformTest.getSinkAsList((Flow)flow);
        DistanceUseCasePlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"GeneSeymour\tLisaRose\t0.14814814814814814"})));
    }

    @Test
    public void testEuclideanDistanceShort() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCritics);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileCritics);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("euclidean/short"), SinkMode.REPLACE);
        Each pipe = new Each("euclidean", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter("\t"));
        pipe = new Each((Pipe)pipe, (Function)new UnGroup(new Fields(new Comparable[]{"name", "movie", "rate"}), Fields.FIRST, 2));
        pipe = new CoGroup((Pipe)pipe, new Fields(new Comparable[]{"movie"}), 1, new Fields(new Comparable[]{"name1", "movie", "rate1", "name2", "movie2", "rate2"}));
        pipe = new Each((Pipe)pipe, new Fields(new Comparable[]{"movie", "name1", "rate1", "name2", "rate2"}), (Function)new Identity());
        pipe = new Each((Pipe)pipe, (Filter)new RegexFilter("^[^\\t]*\\t([^\\t]*)\\t[^\\t]*\\t\\1\\t.*", true));
        pipe = new Each((Pipe)pipe, (Function)new SortElements(new Fields[]{new Fields(new Comparable[]{"name1", "rate1"}), new Fields(new Comparable[]{"name2", "rate2"})}));
        pipe = new GroupBy((Pipe)pipe, Fields.ALL);
        pipe = new Every((Pipe)pipe, Fields.ALL, (Aggregator)new First(), Fields.RESULTS);
        Identity sqDiff = new Identity(new Fields(new Comparable[]{"score"})){

            public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
                TupleEntry input = functionCall.getArguments();
                functionCall.getOutputCollector().add(new Tuple(new Object[]{Math.pow(input.getTuple().getDouble(0) - input.getTuple().getDouble(1), 2.0)}));
            }
        };
        pipe = new Each((Pipe)pipe, new Fields(new Comparable[]{"rate1", "rate2"}), (Function)sqDiff, Fields.ALL);
        pipe = new GroupBy((Pipe)pipe, new Fields(new Comparable[]{"name1", "name2"}));
        Sum distance = new Sum(new Fields(new Comparable[]{"distance"})){

            public void complete(FlowProcess flowProcess, AggregatorCall aggregatorCall) {
                Tuple tuple = super.getResult(aggregatorCall);
                aggregatorCall.getOutputCollector().add(new Tuple(new Object[]{1.0 / (1.0 + tuple.getDouble(0))}));
            }
        };
        pipe = new Every((Pipe)pipe, new Fields(new Comparable[]{"score"}), (Aggregator)distance, new Fields(new Comparable[]{"name1", "name2", "distance"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)pipe);
        flow.complete();
        DistanceUseCasePlatformTest.validateLength((Flow)flow, (int)21);
        List results = DistanceUseCasePlatformTest.getSinkAsList((Flow)flow);
        DistanceUseCasePlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"GeneSeymour\tLisaRose\t0.14814814814814814"})));
    }

    @Test
    public void testEuclideanDistanceComposite() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCritics);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileCritics);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("euclidean/composite"), SinkMode.REPLACE);
        Each pipe = new Each("euclidean", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter("\t"));
        pipe = new Each((Pipe)pipe, (Function)new UnGroup(new Fields(new Comparable[]{"name", "movie", "rate"}), Fields.FIRST, 2));
        pipe = new EuclideanDistance((Pipe)pipe, new Fields(new Comparable[]{"name", "movie", "rate"}), new Fields(new Comparable[]{"name1", "name2", "distance"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)pipe);
        flow.complete();
        DistanceUseCasePlatformTest.validateLength((Flow)flow, (int)21);
        List results = DistanceUseCasePlatformTest.getSinkAsList((Flow)flow);
        DistanceUseCasePlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"GeneSeymour\tLisaRose\t0.14814814814814814"})));
    }

    @Test
    public void testPearsonDistanceComposite() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileCritics);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileCritics);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("pearson/composite"), SinkMode.REPLACE);
        Each pipe = new Each("pearson", new Fields(new Comparable[]{"line"}), (Function)new RegexSplitter("\t"));
        pipe = new Each((Pipe)pipe, (Function)new UnGroup(new Fields(new Comparable[]{"name", "movie", "rate"}), Fields.FIRST, 2));
        pipe = new PearsonDistance((Pipe)pipe, new Fields(new Comparable[]{"name", "movie", "rate"}), new Fields(new Comparable[]{"name1", "name2", "distance"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, (Pipe)pipe);
        flow.complete();
        DistanceUseCasePlatformTest.validateLength((Flow)flow, (int)21);
        List results = DistanceUseCasePlatformTest.getSinkAsList((Flow)flow);
        DistanceUseCasePlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"GeneSeymour\tLisaRose\t0.39605901719066977"})));
    }
}

