/*
 * Decompiled with CFR 0.152.
 */
package cascading.cascade;

import cascading.PlatformTestCase;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowSkipIfSinkExists;
import cascading.flow.FlowSkipIfSinkNotStale;
import cascading.flow.FlowSkipStrategy;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.partition.DelimitedPartition;
import cascading.tap.partition.Partition;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryCollector;
import cascading.util.Util;
import java.io.IOException;
import org.junit.Test;

public class CascadeStalePlatformTest
extends PlatformTestCase {
    public CascadeStalePlatformTest() {
        super(true);
    }

    @Test
    public void testCascadeSkipOnModifiedTime() throws IOException {
        final String outputPath = this.getOutputPath("output");
        this.runCascade(new TapSupplier(){

            @Override
            public Tap supply(SinkMode mode) {
                return CascadeStalePlatformTest.this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"upper"}), "+", outputPath, mode);
            }
        }, (FlowSkipStrategy)new FlowSkipIfSinkNotStale());
    }

    @Test
    public void testCascadePartitionSkipOnModifiedTime() throws IOException {
        final String outputPath = this.getOutputPath("output");
        this.runCascade(new TapSupplier(){

            @Override
            public Tap supply(SinkMode mode) {
                Tap partitionTap = CascadeStalePlatformTest.this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"upper"}), "+", outputPath, mode);
                DelimitedPartition partition = new DelimitedPartition(new Fields(new Comparable[]{"lower", "number"}));
                partitionTap = CascadeStalePlatformTest.this.getPlatform().getPartitionTap(partitionTap, (Partition)partition, 1);
                return partitionTap;
            }
        }, (FlowSkipStrategy)new FlowSkipIfSinkNotStale());
    }

    @Test
    public void testCascadePartitionSkipExists() throws IOException {
        final String outputPath = this.getOutputPath("output");
        this.runCascade(new TapSupplier(){

            @Override
            public Tap supply(SinkMode mode) {
                Tap partitionTap = CascadeStalePlatformTest.this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"upper"}), "+", outputPath, mode);
                DelimitedPartition partition = new DelimitedPartition(new Fields(new Comparable[]{"lower", "number"}));
                partitionTap = CascadeStalePlatformTest.this.getPlatform().getPartitionTap(partitionTap, (Partition)partition, 1);
                return partitionTap;
            }
        }, (FlowSkipStrategy)new FlowSkipIfSinkExists());
    }

    private void runCascade(TapSupplier supplier, FlowSkipStrategy skipStrategy) throws IOException {
        String inputPath = this.getOutputPath("input.txt");
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"number", "lower", "upper"}), " ", inputPath);
        TupleEntryCollector collector = source.openForWrite(this.getPlatform().getFlowProcess());
        collector.add(new Tuple(new Object[]{0, "a", "B"}));
        collector.add(new Tuple(new Object[]{1, "a", "B"}));
        collector.add(new Tuple(new Object[]{2, "a", "B"}));
        collector.close();
        Tap sinkTap = supplier.supply(SinkMode.REPLACE);
        Flow firstFlow = this.getPlatform().getFlowConnector().connect("first", source, sinkTap, new Pipe("copy"));
        firstFlow.complete();
        sinkTap = supplier.supply(SinkMode.KEEP);
        Flow secondFlow = this.getPlatform().getFlowConnector().connect("second", source, sinkTap, new Pipe("copy"));
        secondFlow.setFlowSkipStrategy(skipStrategy);
        Cascade firstCascade = new CascadeConnector().connect(new Flow[]{secondFlow});
        firstCascade.complete();
        CascadeStalePlatformTest.assertTrue((boolean)secondFlow.getStats().isSkipped());
        CascadeStalePlatformTest.assertTrue((String)"unable to delete resource", (boolean)source.deleteResource(secondFlow.getFlowProcess()));
        Util.safeSleep((long)1000L);
        collector = source.openForWrite(this.getPlatform().getFlowProcess());
        collector.add(new Tuple(new Object[]{0, "a", "B"}));
        collector.add(new Tuple(new Object[]{1, "a", "B"}));
        collector.add(new Tuple(new Object[]{2, "a", "B"}));
        collector.close();
        sinkTap = supplier.supply(SinkMode.KEEP);
        Flow thirdFlow = this.getPlatform().getFlowConnector().connect("third", source, sinkTap, new Pipe("copy"));
        Cascade secondCascade = new CascadeConnector().connect(new Flow[]{thirdFlow});
        secondCascade.complete();
        CascadeStalePlatformTest.assertFalse((boolean)thirdFlow.getStats().isSkipped());
    }

    static interface TapSupplier {
        public Tap supply(SinkMode var1);
    }
}

