/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap.hadoop;

import cascading.PlatformTestCase;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.platform.hadoop.BaseHadoopPlatform;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.scheme.hadoop.SequenceFile;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.MultiSourceTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Dfs;
import cascading.tap.hadoop.GlobHfs;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tap.partition.DelimitedPartition;
import cascading.tap.partition.Partition;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopTapPlatformTest
extends PlatformTestCase
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopTapPlatformTest.class);

    public HadoopTapPlatformTest() {
        super(true);
    }

    @Test
    public void testDfs() throws URISyntaxException, IOException {
        if (!this.getPlatform().isUseCluster()) {
            return;
        }
        if (!((BaseHadoopPlatform)this.getPlatform()).isHDFSAvailable()) {
            LOG.warn("skipped Dfs tests, HDFS is unavailable on current platform");
            return;
        }
        Dfs tap = new Dfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), "some/path");
        String path = tap.getFullIdentifier(this.getPlatform().getFlowProcess());
        HadoopTapPlatformTest.assertFalse((String)"wrong scheme", (boolean)new Path(path).toUri().getScheme().equalsIgnoreCase("file"));
        new Dfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), "hdfs://localhost:5001/some/path");
        new Dfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), new URI("hdfs://localhost:5001/some/path"));
        try {
            new Dfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), "s3://localhost:5001/some/path");
            HadoopTapPlatformTest.fail((String)"not valid url");
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            new Dfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), new URI("s3://localhost:5001/some/path"));
            HadoopTapPlatformTest.fail((String)"not valid url");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testLfs() throws URISyntaxException, IOException {
        Lfs tap = new Lfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), "some/path");
        String path = tap.getFullIdentifier(this.getPlatform().getFlowProcess());
        HadoopTapPlatformTest.assertTrue((String)"wrong scheme", (boolean)new Path(path).toUri().getScheme().equalsIgnoreCase("file"));
        new Lfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), "file:///some/path");
        try {
            new Lfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"foo"})), "s3://localhost:5001/some/path");
            HadoopTapPlatformTest.fail((String)"not valid url");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testNullsFromScheme() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileComments);
        Hfs source = new Hfs((Scheme)new CommentScheme(new Fields(new Comparable[]{"line"})), InputData.inputFileComments);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, (Function)new Identity());
        Hfs sink = new Hfs((Scheme)new TextLine(1), this.getOutputPath("testnulls"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        HadoopTapPlatformTest.validateLength((Flow)flow, (int)5, null);
        TupleEntryIterator iterator = flow.openSink();
        HadoopTapPlatformTest.assertEquals((String)"not equal: tuple.get(1)", (Object)"1 a", (Object)((TupleEntry)iterator.next()).getObject(1));
        iterator.close();
        HadoopTapPlatformTest.validateLength((TupleEntryIterator)flow.openSource(), (int)5);
    }

    @Test
    public void testResolvedSinkFields() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"line"})), InputData.inputFileLower);
        Pipe pipe = new Pipe("test");
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)splitter);
        Hfs sink = new Hfs((Scheme)new ResolvedScheme(new Fields(new Comparable[]{"num", "char"})), this.getOutputPath("resolvedfields"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        List tuples = HadoopTapPlatformTest.asList((Flow)flow, (Tap)sink);
        ArrayList<Object> values = new ArrayList<Object>();
        for (Tuple tuple : tuples) {
            values.add(tuple.getObject(1));
        }
        HadoopTapPlatformTest.assertTrue((boolean)values.contains("1\ta"));
        HadoopTapPlatformTest.assertTrue((boolean)values.contains("2\tb"));
        HadoopTapPlatformTest.assertTrue((boolean)values.contains("3\tc"));
        HadoopTapPlatformTest.assertTrue((boolean)values.contains("4\td"));
        HadoopTapPlatformTest.assertTrue((boolean)values.contains("5\te"));
        HadoopTapPlatformTest.assertEquals((int)5, (int)tuples.size());
        HadoopTapPlatformTest.assertEquals((int)5, (int)HadoopTapPlatformTest.asList((Flow)flow, (Tap)source).size());
    }

    @Test
    public void testGlobHfs() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        GlobHfs source = new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r],owe?}.txt");
        HadoopTapPlatformTest.assertEquals((int)2, (int)source.getTaps().length);
        HadoopTapPlatformTest.assertEquals((int)1, (int)new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "../?ata/").getTaps().length);
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("glob"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), "\\s");
        Each concatPipe = new Each(new Pipe("concat"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Flow concatFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect("first", (Tap)source, (Tap)sink, (Pipe)concatPipe);
        Hfs nextSink = new Hfs((Scheme)new TextLine(), this.getOutputPath("glob2"), SinkMode.REPLACE);
        Flow nextFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect("second", (Tap)sink, (Tap)nextSink, (Pipe)concatPipe);
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{concatFlow, nextFlow});
        cascade.complete();
        HadoopTapPlatformTest.validateLength((Flow)concatFlow, (int)10);
    }

    @Test
    public void testNestedMultiSourceGlobHfs() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        GlobHfs source1 = new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r]}.txt");
        GlobHfs source2 = new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{owe?}.txt");
        MultiSourceTap source = new MultiSourceTap(new Tap[]{source1, source2});
        HadoopTapPlatformTest.assertEquals((long)2L, (long)source.getNumChildTaps());
        Hfs sink = new Hfs((Scheme)new TextLine(), this.getOutputPath("globmultisource"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), "\\s");
        Each concatPipe = new Each(new Pipe("concat"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Flow concatFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect("first", (Tap)source, (Tap)sink, (Pipe)concatPipe);
        Hfs nextSink = new Hfs((Scheme)new TextLine(), this.getOutputPath("globmultiource2"), SinkMode.REPLACE);
        Flow nextFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect("second", (Tap)sink, (Tap)nextSink, (Pipe)concatPipe);
        Cascade cascade = new CascadeConnector(this.getProperties()).connect(new Flow[]{concatFlow, nextFlow});
        cascade.complete();
        HadoopTapPlatformTest.validateLength((Flow)concatFlow, (int)10);
    }

    @Test
    public void testMultiSourceIterator() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        GlobHfs source1 = new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r]}.txt");
        GlobHfs source2 = new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{owe?}.txt");
        MultiSourceTap source = new MultiSourceTap(new Tap[]{source1, source2});
        HadoopTapPlatformTest.validateLength((TupleEntryIterator)source.openForRead(this.getPlatform().getFlowProcess()), (int)10);
        GlobHfs sourceMulti = new GlobHfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r],owe?}.txt");
        source = new MultiSourceTap(new Tap[]{sourceMulti});
        HadoopTapPlatformTest.validateLength((TupleEntryIterator)source.openForRead(this.getPlatform().getFlowProcess()), (int)10, null);
    }

    @Test
    public void testCommitResource() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        final int[] count = new int[]{0};
        Hfs sink = new Hfs((Scheme)new TextDelimited(Fields.ALL), this.getOutputPath("committap"), SinkMode.REPLACE){

            public boolean commitResource(Configuration conf) throws IOException {
                count[0] = count[0] + 1;
                return true;
            }
        };
        Flow flow = this.getPlatform().getFlowConnector().connect(source, (Tap)sink, pipe);
        flow.complete();
        HadoopTapPlatformTest.assertEquals((int)1, (int)count[0]);
        HadoopTapPlatformTest.validateLength((Flow)flow, (int)8, null);
    }

    @Test
    public void testCommitResourceFails() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Hfs sink = new Hfs((Scheme)new TextDelimited(Fields.ALL), this.getOutputPath("committapfail"), SinkMode.REPLACE){

            public boolean commitResource(Configuration conf) throws IOException {
                throw new IOException("failed intentionally");
            }
        };
        Flow flow = this.getPlatform().getFlowConnector().connect(source, (Tap)sink, pipe);
        try {
            flow.complete();
            HadoopTapPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testHfsAsterisk() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceExists = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "*");
        HadoopTapPlatformTest.assertTrue((boolean)sourceExists.resourceExists(this.getPlatform().getFlowProcess()));
        TupleEntryIterator iterator = sourceExists.openForRead(this.getPlatform().getFlowProcess());
        HadoopTapPlatformTest.assertTrue((boolean)iterator.hasNext());
        iterator.close();
        try {
            Hfs sourceNotExists = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "/blah/");
            iterator = sourceNotExists.openForRead(this.getPlatform().getFlowProcess());
            HadoopTapPlatformTest.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testHfsBracketAsterisk() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceExists = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "{*}");
        HadoopTapPlatformTest.assertTrue((boolean)sourceExists.resourceExists(this.getPlatform().getFlowProcess()));
        TupleEntryIterator iterator = sourceExists.openForRead(this.getPlatform().getFlowProcess());
        HadoopTapPlatformTest.assertTrue((boolean)iterator.hasNext());
        iterator.close();
        try {
            Hfs sourceNotExists = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "/blah/");
            iterator = sourceNotExists.openForRead(this.getPlatform().getFlowProcess());
            HadoopTapPlatformTest.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testDupeConfigFromScheme() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTap((Scheme)new DupeConfigScheme(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper, SinkMode.KEEP);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("dupeconfig"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        HashJoin splice = new HashJoin((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Properties properties = FlowConnectorProps.flowConnectorProps().setEnableDecorateAccumulatedTap(false).buildProperties(this.getProperties());
        Flow flow = this.getPlatform().getFlowConnector((Map)properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        HadoopTapPlatformTest.validateLength((Flow)flow, (int)5);
        List values = HadoopTapPlatformTest.getSinkAsList((Flow)flow);
        HadoopTapPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        HadoopTapPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testMissingInputFormat() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs source = new Hfs((Scheme)new TextDelimited(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache){

            public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
            }
        };
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Hfs sink = new Hfs((Scheme)new TextDelimited(Fields.ALL), this.getOutputPath("missinginputformat"), SinkMode.REPLACE);
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
            flow.complete();
            HadoopTapPlatformTest.fail((String)"did not test for missing input format");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testChildIdentifiers() throws Exception {
        if (!this.getPlatform().isUseCluster()) {
            return;
        }
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Object jobConf = ((BaseHadoopPlatform)this.getPlatform()).getConfiguration();
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), this.getOutputPath("multifiles"));
        tap.deleteResource(this.getPlatform().getFlowProcess());
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf));
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf, 2, true));
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf, 2, false));
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf, 1, true));
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf, 1, false));
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf, 0, true));
        this.assertEqualsSize("missing", 0, tap.getChildIdentifiers(jobConf, 0, false));
        tap.createResource(this.getPlatform().getFlowProcess());
        this.assertEqualsSize("no children", 0, tap.getChildIdentifiers(jobConf));
        this.assertEqualsSize("no children", 0, tap.getChildIdentifiers(jobConf, 2, true));
        this.assertEqualsSize("no children", 0, tap.getChildIdentifiers(jobConf, 2, false));
        this.assertEqualsSize("no children", 0, tap.getChildIdentifiers(jobConf, 1, true));
        this.assertEqualsSize("no children", 0, tap.getChildIdentifiers(jobConf, 1, false));
        this.assertEqualsSize("no children", 1, tap.getChildIdentifiers(jobConf, 0, true));
        this.assertEqualsSize("no children", 1, tap.getChildIdentifiers(jobConf, 0, false));
        this.writeFileTo("multifiles/A");
        this.writeFileTo("multifiles/B");
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 2, true));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 2, false));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 1, true));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 1, false));
        this.assertEqualsSize("children", 1, tap.getChildIdentifiers(jobConf, 0, true));
        this.assertEqualsSize("children", 1, tap.getChildIdentifiers(jobConf, 0, false));
        tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "/");
        this.assertEqualsSize("root", -1, tap.getChildIdentifiers(jobConf));
        this.assertEqualsSize("root", -1, tap.getChildIdentifiers(jobConf, 2, true));
        this.assertEqualsSize("root", -1, tap.getChildIdentifiers(jobConf, 2, false));
        this.assertEqualsSize("root", -1, tap.getChildIdentifiers(jobConf, 1, true));
        this.assertEqualsSize("root", -1, tap.getChildIdentifiers(jobConf, 1, false));
        this.assertEqualsSize("root", 1, tap.getChildIdentifiers(jobConf, 0, true));
        this.assertEqualsSize("root", 1, tap.getChildIdentifiers(jobConf, 0, false));
        tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "./");
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf));
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf, 2, true));
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf, 2, false));
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf, 1, true));
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf, 1, false));
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf, 0, true));
        this.assertEqualsSize("current", -1, tap.getChildIdentifiers(jobConf, 0, false));
        tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), this.getOutputPath("hiddenfiles"));
        this.writeFileTo("hiddenfiles/A");
        this.writeFileTo("hiddenfiles/B");
        this.writeFileTo("hiddenfiles/.hidden");
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 2, true));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 2, false));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 1, true));
        this.assertEqualsSize("children", 2, tap.getChildIdentifiers(jobConf, 1, false));
        this.assertEqualsSize("children", 1, tap.getChildIdentifiers(jobConf, 0, true));
        this.assertEqualsSize("children", 1, tap.getChildIdentifiers(jobConf, 0, false));
    }

    public void assertEqualsSize(String message, int expected, String[] actual) {
        if (expected == -1) {
            return;
        }
        HadoopTapPlatformTest.assertEquals((int)expected, (int)actual.length);
    }

    private void writeFileTo(String path) throws IOException {
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), this.getOutputPath(path));
        TupleEntryCollector collector = tap.openForWrite(this.getPlatform().getFlowProcess());
        collector.add(new Tuple(new Object[]{1, "1"}));
        collector.close();
    }

    @Test
    public void testPrepareResource() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        final int[] readCount = new int[]{0};
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache){

            public boolean prepareResourceForRead(Configuration conf) throws IOException {
                readCount[0] = readCount[0] + 1;
                return true;
            }
        };
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        final int[] writeCount = new int[]{0};
        Hfs sink = new Hfs((Scheme)new TextDelimited(Fields.ALL), this.getOutputPath("preparetap"), SinkMode.REPLACE){

            public boolean prepareResourceForWrite(Configuration conf) throws IOException {
                writeCount[0] = writeCount[0] + 1;
                return true;
            }
        };
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        HadoopTapPlatformTest.assertEquals((int)1, (int)readCount[0]);
        HadoopTapPlatformTest.assertEquals((int)1, (int)writeCount[0]);
        HadoopTapPlatformTest.validateLength((Flow)flow, (int)8, null);
    }

    @Test
    public void testPrepareResourceForReadFails() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache){

            public boolean prepareResourceForRead(Configuration conf) throws IOException {
                throw new IOException("failed intentionally");
            }
        };
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Hfs sink = new Hfs((Scheme)new TextDelimited(Fields.ALL), this.getOutputPath("preparereadtapfail"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        try {
            flow.complete();
            HadoopTapPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPrepareResourceForWriteFails() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Hfs sink = new Hfs((Scheme)new TextDelimited(Fields.ALL), this.getOutputPath("preparewritetapfail"), SinkMode.REPLACE){

            public boolean prepareResourceForWrite(Configuration conf) throws IOException {
                throw new IOException("failed intentionally");
            }
        };
        Flow flow = this.getPlatform().getFlowConnector().connect(source, (Tap)sink, pipe);
        try {
            flow.complete();
            HadoopTapPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testTemporarySinkPathIsDeleted() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap source = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"a", "b"}), " ", InputData.inputFileLowerOffset);
        Pipe pipe = new Pipe("test");
        String outputPath = this.getOutputPath("partition-tap-sink");
        Tap sink = this.getPlatform().getDelimitedFile(new Fields(new Comparable[]{"a"}), " ", outputPath);
        sink = this.getPlatform().getPartitionTap(sink, (Partition)new DelimitedPartition(new Fields(new Comparable[]{"b"})), 1);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        Path tempPath = new Path(outputPath, "_temporary");
        FileSystem fileSystem = tempPath.getFileSystem((Configuration)flow.getConfigCopy());
        HadoopTapPlatformTest.assertFalse((boolean)fileSystem.exists(tempPath));
    }

    public class DupeConfigScheme
    extends TextLine {
        public DupeConfigScheme(Fields sourceFields) {
            super(sourceFields);
        }

        public void sourceConfInit(FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf) {
            if (conf.get("this.is.a.dupe") != null) {
                throw new IllegalStateException("has dupe config value");
            }
            conf.set("this.is.a.dupe", "dupe");
            super.sourceConfInit(flowProcess, tap, conf);
        }

        public void sourcePrepare(FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
            if (flowProcess.getStringProperty("this.is.a.dupe") == null) {
                throw new IllegalStateException("has no dupe config value");
            }
            super.sourcePrepare(flowProcess, sourceCall);
        }
    }

    public class ResolvedScheme
    extends TextLine {
        private final Fields expectedFields;

        public ResolvedScheme(Fields expectedFields) {
            this.expectedFields = expectedFields;
        }

        public void sinkPrepare(FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
            Fields found = sinkCall.getOutgoingEntry().getFields();
            if (!found.equals((Object)this.expectedFields)) {
                throw new RuntimeException("fields to not match, expect: " + this.expectedFields + ", found: " + found);
            }
            super.sinkPrepare(flowProcess, sinkCall);
        }

        public void sink(FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
            Fields found = sinkCall.getOutgoingEntry().getFields();
            if (!found.equals((Object)this.expectedFields)) {
                throw new RuntimeException("fields to not match, expect: " + this.expectedFields + ", found: " + found);
            }
            super.sink(flowProcess, sinkCall);
        }
    }

    public class CommentScheme
    extends TextLine {
        public CommentScheme(Fields sourceFields) {
            super(sourceFields);
        }

        public boolean source(FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
            boolean success = ((RecordReader)sourceCall.getInput()).next(((Object[])sourceCall.getContext())[0], ((Object[])sourceCall.getContext())[1]);
            if (!success) {
                return false;
            }
            if (((Object[])sourceCall.getContext())[1].toString().matches("^\\s*#.*$")) {
                return this.source(flowProcess, sourceCall);
            }
            this.sourceHandleInput(sourceCall);
            return true;
        }
    }
}

