/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.hadoop.mapred;

import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class HadoopIOFormatsITCase
extends JavaProgramTestBase {
    private static int NUM_PROGRAMS = 2;
    private int curProgId;
    private String[] resultPath;
    private String[] expectedResult;
    private String sequenceFileInPath;
    private String sequenceFileInPathNull;

    public HadoopIOFormatsITCase(Configuration config) {
        super(config);
        this.curProgId = this.config.getInteger("ProgramId", -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void preSubmit() throws Exception {
        this.resultPath = new String[]{this.getTempDirPath("result0"), this.getTempDirPath("result1")};
        File sequenceFile = this.createAndRegisterTempFile("seqFile");
        this.sequenceFileInPath = sequenceFile.toURI().toString();
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        FileSystem fs = FileSystem.get((URI)URI.create(sequenceFile.getAbsolutePath()), (org.apache.hadoop.conf.Configuration)conf);
        Path path = new Path(sequenceFile.getAbsolutePath());
        int kvCount = 4;
        LongWritable key = new LongWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter((FileSystem)fs, (org.apache.hadoop.conf.Configuration)conf, (Path)path, key.getClass(), value.getClass());
            for (int i = 0; i < kvCount; ++i) {
                if (i == 1) {
                    for (int a = 0; a < 15; ++a) {
                        key.set((long)i);
                        value.set(i + " - somestring");
                        writer.append((Writable)key, (Writable)value);
                    }
                }
                key.set((long)i);
                value.set(i + " - somestring");
                writer.append((Writable)key, (Writable)value);
            }
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(writer);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)writer);
        File sequenceFileNull = this.createAndRegisterTempFile("seqFileNullKey");
        this.sequenceFileInPathNull = sequenceFileNull.toURI().toString();
        path = new Path(this.sequenceFileInPathNull);
        LongWritable value1 = new LongWritable();
        SequenceFile.Writer writer1 = null;
        try {
            writer1 = SequenceFile.createWriter((FileSystem)fs, (org.apache.hadoop.conf.Configuration)conf, (Path)path, NullWritable.class, value1.getClass());
            for (int i = 0; i < kvCount; ++i) {
                value1.set((long)i);
                writer1.append((Writable)NullWritable.get(), (Writable)value1);
            }
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(writer1);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)writer1);
    }

    protected void testProgram() throws Exception {
        this.expectedResult = HadoopIOFormatPrograms.runProgram(this.curProgId, this.resultPath, this.sequenceFileInPath, this.sequenceFileInPathNull);
    }

    protected void postSubmit() throws Exception {
        for (int i = 0; i < this.resultPath.length; ++i) {
            HadoopIOFormatsITCase.compareResultsByLinesInMemory((String)this.expectedResult[i], (String)this.resultPath[i]);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
        for (int i = 1; i <= NUM_PROGRAMS; ++i) {
            Configuration config = new Configuration();
            config.setInteger("ProgramId", i);
            tConfigs.add(config);
        }
        return TestBaseUtils.toParameterList(tConfigs);
    }

    public static class HadoopIOFormatPrograms {
        public static String[] runProgram(int progId, String[] resultPath, String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
            switch (progId) {
                case 1: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    SequenceFileInputFormat sfif = new SequenceFileInputFormat();
                    JobConf hdconf = new JobConf();
                    SequenceFileInputFormat.addInputPath((JobConf)hdconf, (Path)new Path(sequenceFileInPath));
                    HadoopInputFormat hif = new HadoopInputFormat((InputFormat)sfif, LongWritable.class, Text.class, hdconf);
                    DataSource ds = env.createInput((org.apache.flink.api.common.io.InputFormat)hif);
                    AggregateOperator sumed = ds.map((MapFunction)new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>(){

                        public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
                            return new Tuple2((Object)((LongWritable)value.f0).get(), value.f1);
                        }
                    }).sum(0);
                    sumed.writeAsText(resultPath[0]);
                    MapOperator res = ds.distinct(new int[]{0}).map((MapFunction)new MapFunction<Tuple2<LongWritable, Text>, String>(){

                        public String map(Tuple2<LongWritable, Text> value) throws Exception {
                            return value.f1 + " - " + ((LongWritable)value.f0).get();
                        }
                    });
                    res.writeAsText(resultPath[1]);
                    env.execute();
                    return new String[]{"(21,3 - somestring)", "0 - somestring - 0\n1 - somestring - 1\n2 - somestring - 2\n3 - somestring - 3\n"};
                }
                case 2: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    SequenceFileInputFormat sfif = new SequenceFileInputFormat();
                    JobConf hdconf = new JobConf();
                    SequenceFileInputFormat.addInputPath((JobConf)hdconf, (Path)new Path(sequenceFileInPathNull));
                    HadoopInputFormat hif = new HadoopInputFormat((InputFormat)sfif, NullWritable.class, LongWritable.class, hdconf);
                    DataSource ds = env.createInput((org.apache.flink.api.common.io.InputFormat)hif);
                    MapOperator res = ds.map((MapFunction)new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>(){

                        public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
                            return new Tuple2(null, (Object)((LongWritable)value.f1).get());
                        }
                    });
                    AggregateOperator res1 = res.groupBy(new int[]{1}).sum(1);
                    res1.writeAsText(resultPath[1]);
                    res.writeAsText(resultPath[0]);
                    env.execute();
                    return new String[]{"(null,2)\n(null,0)\n(null,1)\n(null,3)", "(null,0)\n(null,1)\n(null,2)\n(null,3)"};
                }
            }
            throw new IllegalArgumentException("Invalid program id");
        }
    }
}

