/*
 * Decompiled with CFR 0.152.
 */
package cascading.pipe.checkpoint;

import cascading.PlatformTestCase;
import cascading.TestFunction;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowDef;
import cascading.flow.FlowStep;
import cascading.operation.Aggregator;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Checkpoint;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.DecoratorTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.util.HashMap;
import java.util.List;
import org.junit.Test;

public class CheckpointPlatformTest
extends PlatformTestCase {
    public CheckpointPlatformTest() {
        super(true);
    }

    @Test
    public void testSimpleCheckpoint() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint(pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simplecheckpoint"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        CheckpointPlatformTest.validateLength((Flow)flow, (int)8, null);
        if (!this.getPlatform().isMapReduce()) {
            return;
        }
        List steps = flow.getFlowSteps();
        CheckpointPlatformTest.assertEquals((String)"wrong size", (int)2, (int)steps.size());
    }

    @Test
    public void testManyCheckpoints() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint(pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Checkpoint(pipe);
        new FlowConnectorProps().setCheckpointTapDecoratorClassName(DecoratorTap.class).setProperties(pipe.getConfigDef());
        pipe = new Each(pipe, (Function)new Identity());
        pipe = new Checkpoint(pipe);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("manycheckpoint"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        CheckpointPlatformTest.validateLength((Flow)flow, (int)8, null);
        if (!this.getPlatform().isMapReduce()) {
            return;
        }
        List steps = flow.getFlowSteps();
        CheckpointPlatformTest.assertEquals((String)"wrong size", (int)3, (int)steps.size());
        int count = 0;
        for (FlowStep step : steps) {
            if (!(step.getSink() instanceof DecoratorTap)) continue;
            ++count;
        }
        CheckpointPlatformTest.assertEquals((int)1, (int)count);
    }

    @Test
    public void testSimpleCheckpointTextIntermediate() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint("checkpoint", pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("checkpoint/sink"), SinkMode.REPLACE);
        Tap checkpoint = this.getPlatform().getDelimitedFile(Fields.ALL, true, "\t", "\"", this.getOutputPath("checkpoint/tap"), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(pipe, sink).addCheckpoint("checkpoint", checkpoint);
        Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
        flow.complete();
        CheckpointPlatformTest.validateLength((Flow)flow, (int)8);
        if (!this.getPlatform().isMapReduce()) {
            return;
        }
        List steps = flow.getFlowSteps();
        CheckpointPlatformTest.assertEquals((String)"wrong size", (int)2, (int)steps.size());
        CheckpointPlatformTest.validateLength((TupleEntryIterator)flow.openTapForRead(checkpoint), (int)10);
    }

    @Test
    public void testFailCheckpoint() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint("checkpoint", pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("failcheckpoint/sink"), SinkMode.REPLACE);
        Tap checkpoint = this.getPlatform().getDelimitedFile(Fields.ALL, true, "\t", "\"", this.getOutputPath("failcheckpoint/tap"), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(pipe, sink).addCheckpoint("checkpointXXXXX", checkpoint);
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
            CheckpointPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testFailCheckpointBeforeEvery() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint("checkpoint", pipe);
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("failcheckpointevery/sink"), SinkMode.REPLACE);
        Tap checkpoint = this.getPlatform().getDelimitedFile(Fields.ALL, true, "\t", "\"", this.getOutputPath("failcheckpointevery/tap"), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(pipe, sink).addCheckpoint("checkpoint", checkpoint);
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
            CheckpointPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testFailCheckpointDeclaredFields() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint("checkpoint", pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("failcheckpointdeclared/sink"), SinkMode.REPLACE);
        Tap checkpoint = this.getPlatform().getTextFile(this.getOutputPath("failcheckpointdeclared/tap"), SinkMode.REPLACE);
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(pipe, sink).addCheckpoint("checkpoint", checkpoint);
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
            CheckpointPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testDuplicateCheckpoint() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint("checkpoint", pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Checkpoint("checkpoint", pipe);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("duplicatecheckpoint"), SinkMode.REPLACE);
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName("restartable")).addSource("test", source).addTailSink(pipe, sink).setRunID("restartable");
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(flowDef);
            CheckpointPlatformTest.fail((String)"should throw element graph exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testRestartCheckpoint() throws Exception {
        if (!this.getPlatform().isMapReduce()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        String sinkPath = this.getOutputPath("restartcheckpoint");
        Flow flow = this.createRestartableFlow(sinkPath, true);
        try {
            flow.complete();
            CheckpointPlatformTest.fail((String)"flow should fail");
        }
        catch (Exception exception) {
            // empty catch block
        }
        int count = 0;
        List steps = flow.getFlowSteps();
        for (FlowStep step : steps) {
            Tap sink = step.getSink();
            if (flow.getSink() == sink || !sink.resourceExists(flow.getConfig())) continue;
            ++count;
        }
        CheckpointPlatformTest.assertEquals((String)"wrong number of intermediate resources exist", (int)1, (int)count);
        flow = this.createRestartableFlow(sinkPath, false);
        flow.complete();
        CheckpointPlatformTest.validateLength((Flow)flow, (int)8, null);
        CheckpointPlatformTest.assertEquals((String)"wrong size", (int)2, (int)flow.getFlowSteps().size());
    }

    private Flow createRestartableFlow(String sinkPath, boolean fail) {
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Checkpoint(pipe);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Each(pipe, (Function)new TestFunction(new Fields(new Comparable[]{"insert"}), new Tuple(new Object[]{"value"}), fail ? 2 : -1));
        Tap sink = this.getPlatform().getTextFile(sinkPath, SinkMode.REPLACE);
        FlowDef flowDef = ((FlowDef)FlowDef.flowDef().setName("restartable")).addSource("test", source).addTailSink(pipe, sink).setRunID("restartable");
        return this.getPlatform().getFlowConnector().connect(flowDef);
    }

    @Test
    public void testMergeAndWriteToTwoSinksWithCheckpoint() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink1 = this.getPlatform().getTextFile(this.getOutputPath("sink1"), SinkMode.REPLACE);
        Tap sink2 = this.getPlatform().getTextFile(this.getOutputPath("sink2"), SinkMode.REPLACE);
        HashMap<String, Tap> sinks = new HashMap<String, Tap>();
        sinks.put("sink1", sink1);
        sinks.put("sink2", sink2);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"line"})));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"line"})));
        GroupBy splice = new GroupBy(new Pipe[]{pipeUpper, pipeLower}, new Fields(new Comparable[]{"line"}));
        Each tail1 = new Each(new Pipe("sink1", (Pipe)splice), new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"line"})));
        Each tail2 = new Each(new Pipe("sink2", (Pipe)splice), new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"line"})));
        tail2 = new Checkpoint((Pipe)tail2);
        tail2 = new Each((Pipe)tail2, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"line"})));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{tail1, tail2});
        flow.complete();
        List tuples1 = CheckpointPlatformTest.asList((Flow)flow, (Tap)sink1);
        List tuples2 = CheckpointPlatformTest.asList((Flow)flow, (Tap)sink2);
        CheckpointPlatformTest.assertEquals((int)10, (int)tuples1.size());
        CheckpointPlatformTest.assertEquals((int)10, (int)tuples2.size());
        CheckpointPlatformTest.assertEquals((Object)tuples1, (Object)tuples2);
    }
}

