/*
 * Decompiled with CFR 0.152.
 */
package cascading.cascade;

import cascading.CascadingException;
import cascading.PlatformTestCase;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.flow.LockingFlowListener;
import cascading.flow.process.ProcessFlow;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.stats.FlowStats;
import cascading.stats.process.ProcessStepStats;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import riffle.process.DependencyIncoming;
import riffle.process.DependencyOutgoing;
import riffle.process.Process;
import riffle.process.ProcessChildren;
import riffle.process.ProcessComplete;
import riffle.process.ProcessConfiguration;
import riffle.process.ProcessCounters;
import riffle.process.ProcessStart;
import riffle.process.ProcessStop;
import riffle.process.scheduler.ProcessChain;

public class RiffleCascadePlatformTest
extends PlatformTestCase {
    public RiffleCascadePlatformTest() {
        super(false);
    }

    private Flow firstFlow(String path) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Pipe pipe = new Pipe("first");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(path + "/first"), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(source, sink, pipe);
    }

    private Flow secondFlow(Tap source, String path) {
        Pipe pipe = new Pipe("second");
        pipe = new Each(pipe, (Function)new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), "\\."));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), this.getOutputPath(path + "/second"), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(source, sink, pipe);
    }

    private Flow thirdFlow(Tap source, String path) {
        Pipe pipe = new Pipe("third");
        pipe = new Each(pipe, (Function)new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-"));
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), this.getOutputPath(path + "/third"), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(source, sink, pipe);
    }

    private Flow fourthFlow(Tap source, String path) {
        Pipe pipe = new Pipe("fourth");
        pipe = new Each(pipe, (Function)new Identity());
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(path + "/fourth"), SinkMode.REPLACE);
        return this.getPlatform().getFlowConnector().connect(source, sink, pipe);
    }

    @Test
    public void testSimpleRiffle() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "perpetual";
        Flow first = this.firstFlow(path);
        Flow second = this.secondFlow(first.getSink(), path);
        Flow third = this.thirdFlow(second.getSink(), path);
        Flow fourth = this.fourthFlow(third.getSink(), path);
        ProcessChain chain = new ProcessChain(true, new Object[]{fourth, second, first, third});
        chain.start();
        chain.complete();
        RiffleCascadePlatformTest.validateLength((Flow)fourth, (int)20);
    }

    @Test
    public void testSimpleRiffleCascade() throws IOException, InterruptedException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "perpetualcascade";
        Flow first = this.firstFlow(path);
        Flow second = this.secondFlow(first.getSink(), path);
        Flow third = this.thirdFlow(second.getSink(), path);
        Flow fourth = this.fourthFlow(third.getSink(), path);
        ProcessFlow firstProcess = new ProcessFlow("first", (Object)first);
        ProcessFlow secondProcess = new ProcessFlow("second", (Object)second);
        ProcessFlow thirdProcess = new ProcessFlow("third", (Object)third);
        ProcessFlow fourthProcess = new ProcessFlow("fourth", (Object)fourth);
        LockingFlowListener flowListener = new LockingFlowListener();
        secondProcess.addListener((FlowListener)flowListener);
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{fourthProcess, secondProcess, firstProcess, thirdProcess});
        cascade.start();
        cascade.complete();
        RiffleCascadePlatformTest.assertTrue((String)"did not start", (boolean)flowListener.started.tryAcquire(2L, TimeUnit.SECONDS));
        RiffleCascadePlatformTest.assertTrue((String)"did not complete", (boolean)flowListener.completed.tryAcquire(2L, TimeUnit.SECONDS));
        RiffleCascadePlatformTest.validateLength((Flow)fourth, (int)20);
    }

    @Test
    public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException {
        ThrowableListener listener = new ThrowableListener();
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "startException";
        Flow process = this.flowWithException(path, FailingRiffle.Failing.START);
        process.addListener((FlowListener)listener);
        try {
            process.start();
            RiffleCascadePlatformTest.fail((String)"there should have been an exception");
        }
        catch (CascadingException exception) {
            RiffleCascadePlatformTest.assertNotNull((Object)listener.getThrowable());
        }
    }

    @Test
    public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException {
        ThrowableListener listener = new ThrowableListener();
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "completeException";
        Flow process = this.flowWithException(path, FailingRiffle.Failing.COMPLETE);
        process.addListener((FlowListener)listener);
        try {
            process.complete();
            RiffleCascadePlatformTest.fail((String)"there should have been an exception");
        }
        catch (CascadingException exception) {
            RiffleCascadePlatformTest.assertNotNull((Object)listener.getThrowable());
        }
    }

    @Test
    public void testProcessFlowWithCounters() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        HashMap<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
        HashMap<String, Long> innerMap = new HashMap<String, Long>();
        innerMap.put("inner-key", 42L);
        counters.put("outer-key", innerMap);
        Flow process = this.flowWithCounters("counter", counters);
        process.complete();
        FlowStats flowStats = process.getFlowStats();
        RiffleCascadePlatformTest.assertNotNull((Object)flowStats);
        ArrayList children = new ArrayList(flowStats.getChildren());
        RiffleCascadePlatformTest.assertEquals((int)1, (int)children.size());
        ProcessStepStats stepStats = (ProcessStepStats)children.get(0);
        RiffleCascadePlatformTest.assertEquals(counters.keySet(), (Object)stepStats.getCounterGroups());
        RiffleCascadePlatformTest.assertEquals(innerMap.keySet(), (Object)stepStats.getCountersFor("outer-key"));
        RiffleCascadePlatformTest.assertEquals((long)42L, (long)stepStats.getCounterValue("outer-key", "inner-key"));
    }

    @Test
    public void testProcessFlowWithChildCounters() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        HashMap<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
        HashMap<String, Long> innerMap = new HashMap<String, Long>();
        innerMap.put("inner-key", 42L);
        counters.put("outer-key", innerMap);
        Flow process = this.flowWithChildren("children", counters);
        process.complete();
        FlowStats flowStats = process.getFlowStats();
        RiffleCascadePlatformTest.assertNotNull((Object)flowStats);
        ArrayList children = new ArrayList(flowStats.getChildren());
        RiffleCascadePlatformTest.assertEquals((int)1, (int)children.size());
        ProcessStepStats stepStats = (ProcessStepStats)children.get(0);
        RiffleCascadePlatformTest.assertEquals(counters.keySet(), (Object)stepStats.getCounterGroups());
        RiffleCascadePlatformTest.assertEquals(innerMap.keySet(), (Object)stepStats.getCountersFor("outer-key"));
        RiffleCascadePlatformTest.assertEquals((long)42L, (long)stepStats.getCounterValue("outer-key", "inner-key"));
    }

    @Test
    public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException {
        ThrowableListener listener = new ThrowableListener();
        this.getPlatform().copyFromLocal(InputData.inputFileIps);
        String path = "stopException";
        Flow process = this.flowWithException(path, FailingRiffle.Failing.STOP);
        process.addListener((FlowListener)listener);
        process.start();
        try {
            process.stop();
            RiffleCascadePlatformTest.fail((String)"there should have been an exception");
        }
        catch (CascadingException exception) {
            RiffleCascadePlatformTest.assertNotNull((Object)listener.getThrowable());
        }
    }

    private Flow flowWithException(String path, FailingRiffle.Failing failing) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(path + "/first"), SinkMode.REPLACE);
        return new ProcessFlow("flow", (Object)new FailingRiffle(source, sink, failing));
    }

    private Flow flowWithCounters(String path, Map<String, Map<String, Long>> counters) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(path + "/first"), SinkMode.REPLACE);
        return new ProcessFlow("counter-flow", (Object)new CounterRiffle(source, sink, counters));
    }

    private Flow flowWithChildren(String path, Map<String, Map<String, Long>> counters) {
        Tap source = this.getPlatform().getTextFile(InputData.inputFileIps);
        Tap sink = this.getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), this.getOutputPath(path + "/first"), SinkMode.REPLACE);
        return new ProcessFlow("counter-flow", (Object)new ChildCounterRiffle(source, sink, counters));
    }

    @Process
    class ChildCounterRiffle {
        Tap sink;
        Tap source;
        Map<String, Map<String, Long>> counter;

        ChildCounterRiffle(Tap source, Tap sink, Map<String, Map<String, Long>> counter) {
            this.source = source;
            this.sink = sink;
            this.counter = counter;
        }

        @ProcessStart
        public void start() {
        }

        @ProcessStop
        public void stop() {
        }

        @ProcessCounters
        public Map<String, Map<String, Long>> getCounters() {
            return null;
        }

        @ProcessChildren
        public List<Object> getChildren() {
            return Arrays.asList(new CounterRiffle(this.source, this.sink, this.counter));
        }

        @ProcessComplete
        public void complete() {
        }

        @DependencyOutgoing
        public Collection getOutgoing() {
            return Collections.unmodifiableCollection(Arrays.asList(this.sink));
        }

        @DependencyIncoming
        public Collection getIncoming() {
            return Collections.unmodifiableCollection(Arrays.asList(this.source));
        }

        @ProcessConfiguration
        public Object getConfiguration() {
            return new Properties();
        }
    }

    @Process
    class CounterRiffle {
        Tap sink;
        Tap source;
        Map<String, Map<String, Long>> counter;

        CounterRiffle(Tap source, Tap sink, Map<String, Map<String, Long>> counter) {
            this.source = source;
            this.sink = sink;
            this.counter = counter;
        }

        @ProcessStart
        public void start() {
        }

        @ProcessStop
        public void stop() {
        }

        @ProcessCounters
        public Map<String, Map<String, Long>> getCounters() {
            return this.counter;
        }

        @ProcessComplete
        public void complete() {
        }

        @DependencyOutgoing
        public Collection getOutgoing() {
            return Collections.unmodifiableCollection(Arrays.asList(this.sink));
        }

        @DependencyIncoming
        public Collection getIncoming() {
            return Collections.unmodifiableCollection(Arrays.asList(this.source));
        }

        @ProcessConfiguration
        public Object getConfiguration() {
            return new Properties();
        }
    }

    class ThrowableListener
    implements FlowListener {
        public Throwable throwable;

        ThrowableListener() {
        }

        public void onStarting(Flow flow) {
        }

        public void onStopping(Flow flow) {
        }

        public void onCompleted(Flow flow) {
        }

        public boolean onThrowable(Flow flow, Throwable throwable) {
            this.throwable = throwable;
            return true;
        }

        public Throwable getThrowable() {
            return this.throwable;
        }
    }

    @Process
    static class FailingRiffle {
        Tap sink;
        Tap source;
        Failing failing;

        FailingRiffle(Tap source, Tap sink, Failing failing) {
            this.source = source;
            this.sink = sink;
            this.failing = failing;
        }

        @ProcessStart
        public void start() {
            if (this.failing == Failing.START) {
                this.crash();
            }
        }

        @ProcessStop
        public void stop() {
            if (this.failing == Failing.STOP) {
                this.crash();
            }
        }

        private void crash() {
            throw new CascadingException("testing");
        }

        @ProcessComplete
        public void complete() {
            if (this.failing == Failing.COMPLETE) {
                this.crash();
            }
        }

        @ProcessConfiguration
        public Object getConfiguration() {
            return new Properties();
        }

        @DependencyOutgoing
        public Collection getOutgoing() {
            return Collections.unmodifiableCollection(Arrays.asList(this.sink));
        }

        @DependencyIncoming
        public Collection getIncoming() {
            return Collections.unmodifiableCollection(Arrays.asList(this.source));
        }

        static enum Failing {
            START,
            STOP,
            COMPLETE;

        }
    }
}

