/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow;

import cascading.PlatformTestCase;
import cascading.flow.AssemblyPlanner;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.operation.Aggregator;
import cascading.operation.Function;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class AssemblyPlannerPlatformTest
extends PlatformTestCase {
    public AssemblyPlannerPlatformTest() {
        super(false);
    }

    @Test
    public void testSimpleAssembly() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        AssemblyPlanner planner = new AssemblyPlanner(){

            public Map<String, String> getFlowDescriptor() {
                return Collections.emptyMap();
            }

            public List<Pipe> resolveTails(AssemblyPlanner.Context context) {
                Pipe pipe = new Pipe((String)context.getFlow().getSourceNames().get(0));
                pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
                pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
                pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
                return Arrays.asList(pipe);
            }
        };
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simple"), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource("test", source).addSink("test", sink).addAssemblyPlanner(planner);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        AssemblyPlannerPlatformTest.validateLength((Flow)flow, (int)8);
    }

    @Test
    public void testCompositeAssembly() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        AssemblyPlanner lazyEach = new AssemblyPlanner(){

            public List<Pipe> resolveTails(AssemblyPlanner.Context context) {
                Each pipe = new Each((Pipe)context.getTails().get(0), new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
                return Arrays.asList(pipe);
            }

            public Map<String, String> getFlowDescriptor() {
                return Collections.EMPTY_MAP;
            }
        };
        AssemblyPlanner lazyCount = new AssemblyPlanner(){

            public List<Pipe> resolveTails(AssemblyPlanner.Context context) {
                GroupBy pipe = new GroupBy((Pipe)context.getTails().get(0), new Fields(new Comparable[]{"ip"}));
                pipe = new Every((Pipe)pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
                return Arrays.asList(pipe);
            }

            public Map<String, String> getFlowDescriptor() {
                return Collections.EMPTY_MAP;
            }
        };
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("composite"), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource("test", source).addSink("test", sink).addTail(pipe).addAssemblyPlanner(lazyEach).addAssemblyPlanner(lazyCount);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        AssemblyPlannerPlatformTest.validateLength((Flow)flow, (int)8);
    }
}

