/*
 * Decompiled with CFR 0.152.
 */
package cascading.cascade;

import cascading.CascadingException;
import cascading.PlatformTestCase;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.CascadeException;
import cascading.cascade.CascadeListener;
import cascading.cascade.LockingCascadeListener;
import cascading.flow.CountingFlowListener;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowListener;
import cascading.flow.FlowProcess;
import cascading.flow.FlowSkipStrategy;
import cascading.flow.Flows;
import cascading.flow.LockingFlowListener;
import cascading.flow.planner.FlowStepJob;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.Checkpoint;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.MultiSourceTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class CascadePlatformTest
extends PlatformTestCase {
    public CascadePlatformTest() {
        super(true);
    }

    private Flow firstFlow(String path, boolean doFail) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Pipe pipe = new Pipe("first");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}));
        if (doFail) {
            pipe = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Function)new FailFunction(Fields.ARGS), Fields.REPLACE);
        }
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(path), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect("first", source, sink, pipe);
    }

    private Flow secondFlow(Tap source, String path) {
        Pipe pipe = new Pipe("second");
        pipe = new Each(pipe, (Function)new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), "\\."));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), this.getOutputPath(path), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect("second", source, sink, pipe);
    }

    private Flow thirdFlow(Tap source, String path) {
        Pipe pipe = new Pipe("third");
        pipe = new Each(pipe, (Function)new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-"));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), this.getOutputPath(path), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect("third", source, sink, pipe);
    }

    private Flow thirdCheckpointFlow(Tap source, String path) {
        Pipe pipe = new Pipe("third");
        pipe = new Each(pipe, (Function)new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-"));
        pipe = new Checkpoint("checkpoint", pipe);
        pipe = new Each(pipe, (Function)new Identity());
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), this.getOutputPath("unusedpath"), SinkMode.REPLACE);
        Tap checkpoint = this.getPlatform().getTabDelimitedFile(Fields.ALL, this.getOutputPath(path), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(pipe, sink).addCheckpoint("checkpoint", checkpoint);
        return this.getPlatform().getFlowConnector().connect(flowDef);
    }

    private Flow fourthFlow(Tap source, String path) {
        Pipe pipe = new Pipe("fourth");
        pipe = new Each(pipe, (Function)new Identity());
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(path), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect("fourth", source, sink, pipe);
    }

    private Flow previousMultiTapFlow(String path, String ordinal) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Pipe pipe = new Pipe(ordinal);
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(path + "/" + ordinal), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect("previous-multi-tap-" + ordinal, source, sink, pipe);
    }

    private Flow multiTapFlow(Tap[] sources, String path) {
        Pipe pipe = new Pipe("multitap");
        pipe = new Each(pipe, (Function)new Identity());
        MultiSourceTap source = new MultiSourceTap(sources);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(path + "/multitap"), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect("multi-tap", (Tap)source, sink, pipe);
    }

    @Test
    public void testSimpleCascade() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "simple";
        Flow first = this.firstFlow(path + "/first", false);
        Flow second = this.secondFlow(first.getSink(), path + "/second");
        Flow third = this.thirdFlow(second.getSink(), path + "/third");
        Flow fourth = this.fourthFlow(third.getSink(), path + "/fourth");
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{fourth, second, third, first});
        cascade.start();
        cascade.complete();
        CascadePlatformTest.validateLength((Flow)fourth, (int)20);
        CascadePlatformTest.assertTrue((boolean)cascade.getHeadFlows().contains(first));
        CascadePlatformTest.assertTrue((boolean)cascade.getSourceTaps().containsAll(first.getSourcesCollection()));
        CascadePlatformTest.assertTrue((boolean)cascade.getTailFlows().contains(fourth));
        CascadePlatformTest.assertTrue((boolean)cascade.getSinkTaps().containsAll(fourth.getSinksCollection()));
    }

    @Test
    public void testSimpleCascadeFail() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "simple";
        Flow first = this.firstFlow(path + "/first", true);
        Flow second = this.secondFlow(first.getSink(), path + "/second");
        Flow third = this.thirdFlow(second.getSink(), path + "/third");
        Flow fourth = this.fourthFlow(third.getSink(), path + "/fourth");
        LockingFlowListener firstFlowListener = new LockingFlowListener();
        LockingFlowListener secondFlowListener = new LockingFlowListener();
        LockingFlowListener thirdFlowListener = new LockingFlowListener();
        LockingFlowListener fourthFlowListener = new LockingFlowListener();
        first.addListener((FlowListener)firstFlowListener);
        second.addListener((FlowListener)secondFlowListener);
        third.addListener((FlowListener)thirdFlowListener);
        fourth.addListener((FlowListener)fourthFlowListener);
        Map<Object, Object> properties = this.getPlatform().getProperties();
        Cascade cascade = new CascadeConnector(properties).connect(new Flow[]{fourth, second, third, first});
        cascade.start();
        try {
            cascade.complete();
            CascadePlatformTest.fail((String)"did not fail");
        }
        catch (Exception exception) {
            // empty catch block
        }
        CascadePlatformTest.assertEquals((String)"first did not fail", (int)1, (int)firstFlowListener.thrown.availablePermits());
        CascadePlatformTest.assertEquals((String)"second failed", (int)0, (int)secondFlowListener.thrown.availablePermits());
        CascadePlatformTest.assertEquals((String)"third failed", (int)0, (int)thirdFlowListener.thrown.availablePermits());
        CascadePlatformTest.assertEquals((String)"fourth failed", (int)0, (int)fourthFlowListener.thrown.availablePermits());
        CascadePlatformTest.assertEquals((String)"second started", (int)0, (int)secondFlowListener.started.availablePermits());
        CascadePlatformTest.assertEquals((String)"third started", (int)0, (int)thirdFlowListener.started.availablePermits());
        CascadePlatformTest.assertEquals((String)"fourth started", (int)0, (int)fourthFlowListener.started.availablePermits());
        CascadePlatformTest.assertEquals((String)"second did not stop", (int)1, (int)secondFlowListener.stopped.availablePermits());
        CascadePlatformTest.assertEquals((String)"second did not stop", (int)1, (int)thirdFlowListener.stopped.availablePermits());
        CascadePlatformTest.assertEquals((String)"second did not stop", (int)1, (int)fourthFlowListener.stopped.availablePermits());
    }

    @Test
    public void testMultiTapCascade() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "multitap";
        Flow first = this.previousMultiTapFlow(path, "first");
        Flow second = this.previousMultiTapFlow(path, "second");
        Flow multitap = this.multiTapFlow(Tap.taps((Tap[])new Tap[]{first.getSink(), second.getSink()}), path);
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{multitap, first, second});
        cascade.start();
        cascade.complete();
        CascadePlatformTest.validateLength((Flow)multitap, (int)40);
    }

    @Test
    public void testSkippedCascade() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "skipped";
        Flow first = this.firstFlow(path + "/first", false);
        Flow second = this.secondFlow(first.getSink(), path + "/second");
        Flow third = this.thirdFlow(second.getSink(), path + "/third");
        Flow fourth = this.fourthFlow(third.getSink(), path + "/fourth");
        CountingFlowListener flowListener = new CountingFlowListener();
        second.addListener((FlowListener)flowListener);
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{first, second, third, fourth});
        cascade.setFlowSkipStrategy(new FlowSkipStrategy(){

            public boolean skipFlow(Flow flow) throws IOException {
                return true;
            }
        });
        cascade.start();
        cascade.complete();
        CascadePlatformTest.assertEquals((int)1, (int)flowListener.skipped);
        CascadePlatformTest.assertFalse((String)"file exists", (boolean)fourth.getSink().resourceExists(fourth.getConfig()));
    }

    @Test
    public void testSimpleCascadeStop() throws IOException, InterruptedException {
        FlowStepJob flowStepJob;
        Map map;
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "stopped";
        Flow first = this.firstFlow(path + "/first" + "-nondeterministic", false);
        Flow second = this.secondFlow(first.getSink(), path + "/second" + "-nondeterministic");
        Flow third = this.thirdFlow(second.getSink(), path + "/third" + "-nondeterministic");
        Flow fourth = this.fourthFlow(third.getSink(), path + "/fourth" + "-nondeterministic");
        LockingCascadeListener cascadeListener = new LockingCascadeListener();
        LockingFlowListener flowListener = new LockingFlowListener();
        first.addListener((FlowListener)flowListener);
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{first, second, third, fourth});
        cascade.addListener((CascadeListener)cascadeListener);
        System.out.println("calling start");
        cascade.start();
        CascadePlatformTest.assertTrue((String)"did not start", (boolean)flowListener.started.tryAcquire(60L, TimeUnit.SECONDS));
        CascadePlatformTest.assertTrue((String)"cascade did not start", (boolean)cascadeListener.started.tryAcquire(60L, TimeUnit.SECONDS));
        do {
            System.out.println("testing if running");
            if (!this.getPlatform().isMapReduce() && !this.getPlatform().isDAG()) continue;
            Thread.sleep(1000L);
        } while ((map = Flows.getJobsMap((Flow)first)) == null || map.values().size() == 0 || !(flowStepJob = (FlowStepJob)map.values().iterator().next()).isStarted());
        System.out.println("calling stop");
        cascade.stop();
        CascadePlatformTest.assertTrue((String)"did not stop", (boolean)flowListener.stopped.tryAcquire(60L, TimeUnit.SECONDS));
        CascadePlatformTest.assertTrue((String)"did not complete", (boolean)flowListener.completed.tryAcquire(60L, TimeUnit.SECONDS));
        CascadePlatformTest.assertTrue((String)"cascade did not stop", (boolean)cascadeListener.stopped.tryAcquire(60L, TimeUnit.SECONDS));
        CascadePlatformTest.assertTrue((String)"cascade did not complete", (boolean)cascadeListener.completed.tryAcquire(60L, TimeUnit.SECONDS));
    }

    @Test
    public void testCascadeID() throws IOException {
        String path = "idtest";
        Flow first = this.firstFlow(path + "/first", false);
        Flow second = this.secondFlow(first.getSink(), path + "/second");
        Flow third = this.thirdFlow(second.getSink(), path + "/third");
        Flow fourth = this.fourthFlow(third.getSink(), path + "/fourth");
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{first, second, third, fourth});
        String id = cascade.getID();
        CascadePlatformTest.assertNotNull((String)"id is null", (Object)id);
        CascadePlatformTest.assertEquals((String)first.getProperty("cascading.cascade.id"), (String)id);
        CascadePlatformTest.assertEquals((String)second.getProperty("cascading.cascade.id"), (String)id);
        CascadePlatformTest.assertEquals((String)third.getProperty("cascading.cascade.id"), (String)id);
        CascadePlatformTest.assertEquals((String)fourth.getProperty("cascading.cascade.id"), (String)id);
    }

    @Test
    public void testCheckpointTapCascade() throws IOException {
        if (!this.getPlatform().isMapReduce()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "checkpoint";
        Flow first = this.firstFlow(path + "/first", false);
        Flow second = this.secondFlow(first.getSink(), path + "/second");
        Flow third = this.thirdCheckpointFlow(second.getSink(), path + "/third");
        Flow fourth = this.fourthFlow((Tap)third.getCheckpoints().values().iterator().next(), path + "/fourth");
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{fourth, second, third, first});
        cascade.start();
        cascade.complete();
        CascadePlatformTest.validateLength((Flow)fourth, (int)20);
        CascadePlatformTest.assertTrue((boolean)cascade.getHeadFlows().contains(first));
        CascadePlatformTest.assertTrue((boolean)cascade.getSourceTaps().containsAll(first.getSourcesCollection()));
        CascadePlatformTest.assertTrue((boolean)cascade.getIntermediateTaps().containsAll(third.getCheckpointsCollection()));
        CascadePlatformTest.assertTrue((boolean)cascade.getCheckpointsTaps().containsAll(third.getCheckpointsCollection()));
        CascadePlatformTest.assertTrue((boolean)cascade.getTailFlows().contains(fourth));
        CascadePlatformTest.assertTrue((boolean)cascade.getSinkTaps().containsAll(fourth.getSinksCollection()));
    }

    @Test(expected=CascadeException.class)
    public void testPlannerFailureDuplicateSinks() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Tap sink = this.getPlatform().getTextFile("output");
        Pipe copyPipe = new Pipe("copy");
        FlowDef flowDef = FlowDef.flowDef().addSource(copyPipe, source).addTailSink(copyPipe, sink);
        Flow firstFlow = this.getPlatform().getFlowConnector().connect(flowDef);
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap secondSource = this.getPlatform().getTextFile(InputData.inputFileApache);
        Tap secondSink = this.getPlatform().getTextFile("output2");
        Pipe secondCopyPipe = new Pipe("copy2", copyPipe);
        flowDef = FlowDef.flowDef().addSource(copyPipe, secondSource).addSink(copyPipe, sink).addTailSink(secondCopyPipe, secondSink);
        Flow secondFlow = this.getPlatform().getFlowConnector().connect(flowDef);
        new CascadeConnector(this.getProperties()).connect(new Flow[]{firstFlow, secondFlow});
    }

    public class FailFunction
    extends BaseOperation
    implements Function {
        public FailFunction(Fields fieldDeclaration) {
            super(1, fieldDeclaration);
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            throw new CascadingException("testing");
        }
    }
}

