/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.flow.FailingFlowListener;
import cascading.flow.Flow;
import cascading.flow.FlowException;
import cascading.flow.FlowListener;
import cascading.flow.FlowProcess;
import cascading.flow.FlowStep;
import cascading.flow.Flows;
import cascading.flow.LockingFlowListener;
import cascading.flow.hadoop.FailOnMissingSuccessFlowListener;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.operation.BaseOperation;
import cascading.operation.Debug;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.operation.Function;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.platform.hadoop.BaseHadoopPlatform;
import cascading.property.AppProps;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.stats.CascadingStats;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import cascading.util.Util;
import data.InputData;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowPlatformTest
extends PlatformTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(FlowPlatformTest.class);

    public FlowPlatformTest() {
        super(true);
    }

    @Test
    public void testLocalModeSource() throws Exception {
        Lfs source = new Lfs((Scheme)new TextLine(), "input/path");
        Hfs sink = new Hfs((Scheme)new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        List steps = flow.getFlowSteps();
        FlowPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        FlowStep step = (FlowStep)steps.get(0);
        boolean isLocal = HadoopUtil.isLocal((Configuration)((Configuration)step.getConfig()));
        FlowPlatformTest.assertTrue((String)"is not local", (boolean)isLocal);
    }

    @Test
    public void testLocalModeSink() throws Exception {
        Hfs source = new Hfs((Scheme)new TextLine(), "input/path");
        Lfs sink = new Lfs((Scheme)new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        List steps = flow.getFlowSteps();
        FlowPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        FlowStep step = (FlowStep)steps.get(0);
        boolean isLocal = HadoopUtil.isLocal((Configuration)((Configuration)step.getConfig()));
        FlowPlatformTest.assertTrue((String)"is not local", (boolean)isLocal);
    }

    @Test
    public void testNotLocalMode() throws Exception {
        if (!this.getPlatform().isUseCluster()) {
            return;
        }
        Hfs source = new Hfs((Scheme)new TextLine(), "input/path");
        Hfs sink = new Hfs((Scheme)new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        List steps = flow.getFlowSteps();
        FlowPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        FlowStep step = (FlowStep)steps.get(0);
        boolean isLocal = HadoopUtil.isLocal((Configuration)((Configuration)((BaseFlowStep)step).createInitializedConfig(flow.getFlowProcess(), ((BaseHadoopPlatform)this.getPlatform()).getConfiguration())));
        FlowPlatformTest.assertTrue((String)"is local", (!isLocal ? 1 : 0) != 0);
    }

    @Test
    public void testStop() throws Exception {
        if (!this.getPlatform().isUseCluster()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("stopped"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)splice);
        LockingFlowListener listener = new LockingFlowListener();
        flow.addListener((FlowListener)listener);
        LOG.info("calling start");
        flow.start();
        Util.safeSleep((long)5000L);
        FlowPlatformTest.assertTrue((String)"did not start", (boolean)listener.started.tryAcquire(60L, TimeUnit.SECONDS));
        while (true) {
            LOG.info("testing if running");
            Thread.sleep(1000L);
            Map map = Flows.getJobsMap((Flow)flow);
            if (map == null || map.values().size() == 0) continue;
            FlowStepJob flowStepJob = (FlowStepJob)map.values().iterator().next();
            if (flowStepJob.getStepStats().getStatus() == CascadingStats.Status.FAILED) {
                FlowPlatformTest.fail((String)"failed to start Hadoop step, please check your environment.");
            }
            if (flowStepJob.isStarted()) break;
        }
        Semaphore start = new Semaphore(0);
        long startTime = System.nanoTime();
        Future<Long> future = Executors.newSingleThreadExecutor().submit(() -> {
            start.release();
            LOG.info("calling complete");
            flow.complete();
            return System.nanoTime() - startTime;
        });
        start.acquire();
        LOG.info("calling stop");
        flow.stop();
        long stopTime = System.nanoTime() - startTime;
        long completeTime = future.get();
        FlowPlatformTest.assertTrue((String)String.format("stop: %s complete: %s", stopTime, completeTime), (stopTime <= completeTime ? 1 : 0) != 0);
        FlowPlatformTest.assertTrue((String)"did not stop", (boolean)listener.stopped.tryAcquire(60L, TimeUnit.SECONDS));
        FlowPlatformTest.assertTrue((String)"did not complete", (boolean)listener.completed.tryAcquire(60L, TimeUnit.SECONDS));
    }

    @Test
    public void testFailedSerialization() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("badserialization"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Filter)new BadFilter());
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        try {
            Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)pipeLower);
            FlowPlatformTest.fail((String)"did not throw serialization exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testStartStopRace() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("startstop"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)pipeLower);
        flow.start();
        flow.stop();
    }

    @Test
    public void testFailingListenerStarting() throws Exception {
        this.failingListenerTest(FailingFlowListener.OnFail.STARTING);
    }

    @Test
    public void testFailingListenerStopping() throws Exception {
        this.failingListenerTest(FailingFlowListener.OnFail.STOPPING);
    }

    @Test
    public void testFailingListenerCompleted() throws Exception {
        this.failingListenerTest(FailingFlowListener.OnFail.COMPLETED);
    }

    @Test
    public void testFailingListenerThrowable() throws Exception {
        this.failingListenerTest(FailingFlowListener.OnFail.THROWABLE);
    }

    private void failingListenerTest(FailingFlowListener.OnFail onFail) throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath(onFail + "/stopped"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        if (onFail == FailingFlowListener.OnFail.THROWABLE) {
            pipeLower = new Each((Pipe)pipeLower, (Filter)new Debug(){

                public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
                    throw new RuntimeException("failing inside pipe assembly intentionally");
                }
            });
        }
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new GroupBy((Pipe)pipeUpper, new Fields(new Comparable[]{"num"}));
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)splice);
        FailingFlowListener listener = new FailingFlowListener(onFail);
        flow.addListener((FlowListener)listener);
        LOG.info("calling start");
        flow.start();
        FlowPlatformTest.assertTrue((String)"did not start", (boolean)listener.started.tryAcquire(120L, TimeUnit.SECONDS));
        if (onFail == FailingFlowListener.OnFail.STOPPING) {
            while (true) {
                LOG.info("testing if running");
                Thread.sleep(1000L);
                Map map = Flows.getJobsMap((Flow)flow);
                if (map == null || map.values().size() == 0) continue;
                FlowStepJob flowStepJob = (FlowStepJob)map.values().iterator().next();
                if (flowStepJob.getStepStats().getStatus() == CascadingStats.Status.FAILED) {
                    FlowPlatformTest.fail((String)"failed to start Hadoop step, please check your environment.");
                }
                if (flowStepJob.isStarted()) break;
            }
            LOG.info("calling stop");
            flow.stop();
        }
        FlowPlatformTest.assertTrue((String)"did not complete", (boolean)listener.completed.tryAcquire(360L, TimeUnit.SECONDS));
        FlowPlatformTest.assertTrue((String)"did not stop", (boolean)listener.stopped.tryAcquire(360L, TimeUnit.SECONDS));
        try {
            flow.complete();
            FlowPlatformTest.fail((String)"did not rethrow exception from listener");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testFlowID() throws Exception {
        Lfs source = new Lfs((Scheme)new TextLine(), "input/path");
        Hfs sink = new Hfs((Scheme)new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Map props = this.getProperties();
        Flow flow1 = this.getPlatform().getFlowConnector(props).connect((Tap)source, (Tap)sink, pipe);
        FlowPlatformTest.assertNotNull((String)"missing id", (Object)flow1.getID());
        FlowPlatformTest.assertNotNull((String)"missing id in conf", (Object)flow1.getProperty("cascading.flow.id"));
        Flow flow2 = this.getPlatform().getFlowConnector(props).connect((Tap)source, (Tap)sink, pipe);
        FlowPlatformTest.assertTrue((String)"same id", (!flow1.getID().equalsIgnoreCase(flow2.getID()) ? 1 : 0) != 0);
    }

    @Test
    public void testCopyConfig() throws Exception {
        Lfs source = new Lfs((Scheme)new TextLine(), "input/path");
        Hfs sink = new Hfs((Scheme)new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Object conf = ((BaseHadoopPlatform)this.getPlatform()).getConfiguration();
        conf.set("cascading.app.name", "testname");
        AppProps props = AppProps.appProps().setVersion("1.2.3");
        Properties properties = props.buildProperties(conf);
        Flow flow = this.getPlatform().getFlowConnector((Map)properties).connect((Tap)source, (Tap)sink, pipe);
        FlowPlatformTest.assertEquals((String)"testname", (String)flow.getProperty("cascading.app.name"));
        FlowPlatformTest.assertEquals((String)"1.2.3", (String)flow.getProperty("cascading.app.version"));
    }

    @Test
    public void testStartWithoutComplete() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("withoutcomplete"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new GroupBy((Pipe)pipeLower, new Fields(new Comparable[]{"num"}));
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)pipeLower);
        LockingFlowListener listener = new LockingFlowListener();
        flow.addListener((FlowListener)listener);
        flow.start();
        FlowPlatformTest.assertTrue((boolean)listener.completed.tryAcquire(90L, TimeUnit.SECONDS));
    }

    @Test
    public void testFailOnMissingSuccessFlowListener() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        FailOnMissingSuccessFlowListener listener = new FailOnMissingSuccessFlowListener();
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs success = new Hfs((Scheme)new TextLine(), this.getOutputPath("withsuccess"), SinkMode.REPLACE);
        Hfs without = new Hfs((Scheme)new TextLine(), this.getOutputPath("withoutsuccess"), SinkMode.REPLACE);
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("final"), SinkMode.REPLACE);
        Flow firstFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)source, (Tap)success, new Pipe("lower"));
        firstFlow.addListener((FlowListener)listener);
        firstFlow.complete();
        Flow secondFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)success, (Tap)without, new Pipe("lower"));
        secondFlow.addListener((FlowListener)listener);
        secondFlow.complete();
        Hfs successTap = new Hfs((Scheme)new TextLine(), new Path(without.getPath(), "_SUCCESS").toString());
        FlowPlatformTest.assertTrue((boolean)successTap.deleteResource(this.getPlatform().getFlowProcess()));
        Flow thirdFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)without, (Tap)sink, new Pipe("lower"));
        thirdFlow.addListener((FlowListener)listener);
        try {
            thirdFlow.complete();
            FlowPlatformTest.fail((String)"listener did not fail flow");
        }
        catch (FlowException flowException) {
            // empty catch block
        }
    }

    private static class BadFilter
    extends BaseOperation
    implements Filter {
        private Object object = new Object();

        private BadFilter() {
        }

        public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
            return false;
        }
    }
}

