package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.data.batch.DatasetOutputCommitter;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.dataset.DataSetException;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetSpecification;
import co.cask.cdap.api.dataset.lib.AbstractDataset;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetProperties;
import co.cask.cdap.api.dataset.lib.Partitioning;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.dataset.module.EmbeddedDataset;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.JavaSparkProgram;
import co.cask.cdap.api.spark.SparkContext;
import co.cask.cdap.api.spark.SparkProgram;
import co.cask.cdap.internal.app.runtime.batch.AppWithTimePartitionedFileSet;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.twill.filesystem.Location;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet.class */
public class SparkAppUsingFileSet extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$CharCount.class */
    public static final class CharCount extends AbstractSpark {
        private Class<? extends SparkProgram> mainSparkClass;

        public CharCount(Class<? extends SparkProgram> cls) {
            this.mainSparkClass = cls;
        }

        public void configure() {
            setMainClass(this.mainSparkClass);
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$CharCountProgram.class */
    public static class CharCountProgram implements JavaSparkProgram {
        public void run(SparkContext sparkContext) {
            String str = (String) sparkContext.getRuntimeArguments().get(AppWithTimePartitionedFileSet.INPUT);
            String str2 = (String) sparkContext.getRuntimeArguments().get(AppWithTimePartitionedFileSet.OUTPUT);
            sparkContext.writeToDataset(transformRDD((JavaPairRDD) sparkContext.readFromDataset(str, Long.class, String.class)), str2, String.class, Integer.class);
            String str3 = (String) sparkContext.getRuntimeArguments().get("inputKey");
            String str4 = (String) sparkContext.getRuntimeArguments().get("outputKey");
            if (str3 == null || str4 == null) {
                return;
            }
            HashMap hashMap = new HashMap();
            TimePartitionedFileSetArguments.setInputStartTime(hashMap, Long.parseLong(str3) - 100);
            TimePartitionedFileSetArguments.setInputEndTime(hashMap, Long.parseLong(str3) + 100);
            JavaPairRDD<String, Integer> transformRDD = transformRDD((JavaPairRDD) sparkContext.readFromDataset(str, Long.class, String.class, hashMap));
            HashMap hashMap2 = new HashMap();
            TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap2, Long.parseLong(str4));
            sparkContext.writeToDataset(transformRDD, str2, String.class, Integer.class, hashMap2);
        }

        private JavaPairRDD<String, Integer> transformRDD(JavaPairRDD<Long, String> javaPairRDD) {
            return javaPairRDD.mapToPair(new PairFunction<Tuple2<Long, String>, String, Integer>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkAppUsingFileSet.CharCountProgram.1
                public Tuple2<String, Integer> call(Tuple2<Long, String> tuple2) throws Exception {
                    return new Tuple2<>(tuple2._2(), Integer.valueOf(((String) tuple2._2()).length()));
                }
            });
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$JavaCharCount.class */
    public static final class JavaCharCount extends AbstractSpark {
        public void configure() {
            setMainClass(CharCountProgram.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$MyFileSet.class */
    public static class MyFileSet extends AbstractDataset implements InputFormatProvider, OutputFormatProvider, DatasetOutputCommitter {
        private final FileSet delegate;

        public MyFileSet(DatasetSpecification datasetSpecification, @EmbeddedDataset("files") FileSet fileSet) {
            super(datasetSpecification.getName(), fileSet, new Dataset[0]);
            this.delegate = fileSet;
        }

        public FileSet getEmbeddedFileSet() {
            return this.delegate;
        }

        public Location getSuccessLocation() throws IOException {
            return this.delegate.getBaseLocation().append("success");
        }

        public Location getFailureLocation() throws IOException {
            return this.delegate.getBaseLocation().append("failure");
        }

        public String getInputFormatClassName() {
            return this.delegate.getInputFormatClassName();
        }

        public Map<String, String> getInputFormatConfiguration() {
            return this.delegate.getInputFormatConfiguration();
        }

        public String getOutputFormatClassName() {
            return this.delegate.getOutputFormatClassName();
        }

        public Map<String, String> getOutputFormatConfiguration() {
            return this.delegate.getOutputFormatConfiguration();
        }

        public void onSuccess() throws DataSetException {
            try {
                getSuccessLocation().createNew();
            } catch (Throwable th) {
                throw Throwables.propagate(th);
            }
        }

        public void onFailure() throws DataSetException {
            try {
                getFailureLocation().createNew();
            } catch (Throwable th) {
                throw Throwables.propagate(th);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$MyRecordReader.class */
    public static class MyRecordReader extends RecordReader<Long, String> {
        private RecordReader<LongWritable, Text> delegate;

        public MyRecordReader(RecordReader<LongWritable, Text> recordReader) {
            this.delegate = recordReader;
        }

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.delegate.initialize(inputSplit, taskAttemptContext);
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            return this.delegate.nextKeyValue();
        }

        /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
        public Long m57getCurrentKey() throws IOException, InterruptedException {
            LongWritable longWritable = (LongWritable) this.delegate.getCurrentKey();
            if (longWritable == null) {
                return null;
            }
            return Long.valueOf(longWritable.get());
        }

        /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
        public String m56getCurrentValue() throws IOException, InterruptedException {
            Text text = (Text) this.delegate.getCurrentValue();
            if (text == null) {
                return null;
            }
            return text.toString();
        }

        public float getProgress() throws IOException, InterruptedException {
            return this.delegate.getProgress();
        }

        public void close() throws IOException {
            this.delegate.close();
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$MyTextInputFormat.class */
    public static final class MyTextInputFormat extends InputFormat<Long, String> {
        TextInputFormat delegate = new TextInputFormat();

        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
            return this.delegate.getSplits(jobContext);
        }

        public RecordReader<Long, String> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new MyRecordReader(this.delegate.createRecordReader(inputSplit, taskAttemptContext));
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkAppUsingFileSet$ScalaCharCount.class */
    public static final class ScalaCharCount extends AbstractSpark {
        public void configure() {
            setMainClass(ScalaFileCountProgram.class);
        }
    }

    public void configure() {
        try {
            createDataset("fs", FileSet.class, FileSetProperties.builder().setInputFormat(MyTextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
            createDataset("pfs", PartitionedFileSet.class, PartitionedFileSetProperties.builder().setPartitioning(Partitioning.builder().addStringField("x").build()).setInputFormat(MyTextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
            createDataset("tpfs", TimePartitionedFileSet.class, FileSetProperties.builder().setInputFormat(MyTextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
            createDataset("myfs", MyFileSet.class, FileSetProperties.builder().setInputFormat(MyTextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
            addSpark(new JavaCharCount());
            addSpark(new ScalaCharCount());
        } catch (Throwable th) {
            throw Throwables.propagate(th);
        }
    }
}
