/*
 * Decompiled with CFR 0.152.
 */
package cascading.cascade;

import cascading.PlatformTestCase;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.CascadeListener;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import org.junit.Test;

public class ParallelCascadePlatformTest
extends PlatformTestCase {
    public ParallelCascadePlatformTest() {
        super(true);
    }

    private Flow firstFlow(String name) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Pipe pipe = new Pipe(name);
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(name), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(source, sink, pipe);
    }

    private Flow secondFlow(String name, Tap source) {
        Pipe pipe = new Pipe(name);
        pipe = new Each(pipe, (Function)new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), "\\."));
        pipe = new Each(pipe, (Function)new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-"));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), this.getOutputPath(name), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(source, sink, pipe);
    }

    private Flow thirdFlow(Tap lhs, Tap rhs) {
        Pipe lhsPipe = new Pipe("lhs");
        Pipe rhsPipe = new Pipe("rhs");
        CoGroup pipe = new CoGroup(lhsPipe, new Fields(new Comparable[]{Integer.valueOf(0)}), rhsPipe, new Fields(new Comparable[]{Integer.valueOf(0)}), Fields.size((int)2));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("third"), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(Cascades.tapsMap((Pipe[])Pipe.pipes((Pipe[])new Pipe[]{lhsPipe, rhsPipe}), (Tap[])Tap.taps((Tap[])new Tap[]{lhs, rhs})), sink, (Pipe)pipe);
    }

    @Test
    public void testCascade() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow first1 = this.firstFlow("first1");
        Flow second1 = this.secondFlow("second1", first1.getSink());
        Flow first2 = this.firstFlow("first2");
        Flow second2 = this.secondFlow("second2", first2.getSink());
        Flow third = this.thirdFlow(second1.getSink(), second2.getSink());
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{first1, second1, first2, second2, third});
        cascade.start();
        cascade.complete();
        ParallelCascadePlatformTest.validateLength((Flow)third, (int)28);
    }

    @Test
    public void testCascadeRaceCondition() throws Throwable {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        final Throwable[] found = new Throwable[1];
        CascadeListener listener = new CascadeListener(){

            public void onStarting(Cascade cascade) {
            }

            public void onStopping(Cascade cascade) {
            }

            public void onCompleted(Cascade cascade) {
            }

            public boolean onThrowable(Cascade cascade, Throwable throwable) {
                found[0] = throwable;
                return false;
            }
        };
        for (int i = 0; i <= 500; i += 50) {
            Flow first = this.firstFlow(String.format("race-%d/first-nondeterministic", i));
            Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{first});
            cascade.addListener(listener);
            cascade.start();
            Thread.sleep(i);
            cascade.stop();
            cascade.complete();
            if (found[0] == null) continue;
            throw found[0];
        }
    }
}

