package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.hadoop.planner.HadoopPlanner;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.junit.Test;

/* loaded from: input_file:cascading/flow/hadoop/MapReduceFlowPlatformTest.class */
public class MapReduceFlowPlatformTest extends PlatformTestCase {
    public MapReduceFlowPlatformTest() {
        super(true);
    }

    @Test
    public void testFlow() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        JobConf configuration = getPlatform().getConfiguration();
        JobConf jobConf = new JobConf(configuration);
        jobConf.setJobName("mrflow");
        jobConf.setOutputKeyClass(LongWritable.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setMapperClass(IdentityMapper.class);
        jobConf.setReducerClass(IdentityReducer.class);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(jobConf, new Path[]{new Path(InputData.inputFileApache)});
        String outputPath = getOutputPath("flowTest");
        FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
        MapReduceFlow mapReduceFlow = new MapReduceFlow("mrflow", jobConf, true);
        validateLength(new Hfs(new TextLine(), InputData.inputFileApache).openForRead(new HadoopFlowProcess(configuration)), 10);
        mapReduceFlow.complete();
        validateLength(new Hfs(new TextLine(), outputPath).openForRead(new HadoopFlowProcess(configuration)), 10);
    }

    private String remove(String str, boolean z) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(str), HadoopPlanner.createJobConf(getProperties()));
        if (z) {
            fileSystem.delete(new Path(str), true);
        }
        return str;
    }

    @Test
    public void testCascade() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), remove(InputData.inputFileApache, false));
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), remove(getOutputPath("flow4"), true), SinkMode.REPLACE);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hfs, hfs2, new Pipe("first-flow"));
        String outputPath = getOutputPath("flow5");
        Flow connect2 = getPlatform().getFlowConnector(getProperties()).connect(hfs2, new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), remove(outputPath, true), SinkMode.REPLACE), new Pipe("second-flow"));
        JobConf createJobConf = HadoopPlanner.createJobConf(getProperties());
        JobConf jobConf = new JobConf(createJobConf);
        jobConf.setJobName("first-mr");
        jobConf.setOutputKeyClass(LongWritable.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setMapperClass(IdentityMapper.class);
        jobConf.setReducerClass(IdentityReducer.class);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(jobConf, new Path[]{new Path(remove(outputPath, true))});
        String outputPath2 = getOutputPath("flow1");
        FileOutputFormat.setOutputPath(jobConf, new Path(remove(outputPath2, true)));
        Flow mapReduceFlow = new MapReduceFlow(jobConf, true);
        JobConf jobConf2 = new JobConf(createJobConf);
        jobConf2.setJobName("second-mr");
        jobConf2.setOutputKeyClass(LongWritable.class);
        jobConf2.setOutputValueClass(Text.class);
        jobConf2.setMapperClass(IdentityMapper.class);
        jobConf2.setReducerClass(IdentityReducer.class);
        jobConf2.setInputFormat(TextInputFormat.class);
        jobConf2.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(jobConf2, new Path[]{new Path(remove(outputPath2, true))});
        String outputPath3 = getOutputPath("flow2");
        FileOutputFormat.setOutputPath(jobConf2, new Path(remove(outputPath3, true)));
        Flow mapReduceFlow2 = new MapReduceFlow(jobConf2, true);
        Job job = new Job(createJobConf);
        job.setJobName("third-mr");
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
        job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
        job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
        job.getConfiguration().set("mapred.mapper.new-api", "true");
        job.getConfiguration().set("mapred.reducer.new-api", "true");
        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(remove(outputPath3, true)));
        String outputPath4 = getOutputPath("flow3");
        org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(remove(outputPath4, true)));
        new CascadeConnector(getProperties()).connect(new Flow[]{connect, connect2, new MapReduceFlow(new JobConf(job.getConfiguration()), true), mapReduceFlow, mapReduceFlow2}).complete();
        validateLength(new Hfs(new TextLine(), outputPath4).openForRead(new HadoopFlowProcess(createJobConf)), 10);
    }
}
