package co.cask.cdap.test.app;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.customaction.AbstractCustomAction;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetArguments;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.workflow.AbstractWorkflow;
import co.cask.cdap.api.workflow.WorkflowContext;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets.class */
public class WorkflowAppWithLocalDatasets extends AbstractApplication {
    public static final String WORDCOUNT_DATASET = "wordcount";
    public static final String RESULT_DATASET = "result";
    public static final String CSV_FILESET_DATASET = "csvfileset";
    public static final String WORKFLOW_NAME = "WorkflowWithLocalDatasets";
    public static final String WORKFLOW_RUNS_DATASET = "workflowruns";
    public static final String UNIQUE_ID_DATASET = "uniqueId";

    /* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets$IntSumReducer.class */
    public static class IntSumReducer extends Reducer<Text, IntWritable, byte[], byte[]> {
        private IntWritable result = new IntWritable();
        private Metrics metrics;

        public void reduce(Text text, Iterable<IntWritable> iterable, Reducer<Text, IntWritable, byte[], byte[]>.Context context) throws IOException, InterruptedException {
            int i = 0;
            Iterator<IntWritable> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().get();
            }
            this.result.set(i);
            this.metrics.count("num.words", i);
            context.write(Bytes.toBytes(text.toString()), Bytes.toBytes(String.valueOf(this.result.get())));
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<IntWritable>) iterable, (Reducer<Text, IntWritable, byte[], byte[]>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets$LocalDatasetReader.class */
    public static class LocalDatasetReader extends AbstractCustomAction {
        private static final Logger LOG = LoggerFactory.getLogger(LocalDatasetReader.class);
        private Metrics metrics;
        private final String actionName;

        @UseDataSet(WorkflowAppWithLocalDatasets.WORDCOUNT_DATASET)
        private KeyValueTable wordCount;

        @UseDataSet(WorkflowAppWithLocalDatasets.RESULT_DATASET)
        private KeyValueTable result;

        private LocalDatasetReader(String str) {
            this.actionName = str;
        }

        protected void configure() {
            super.configure();
            setName(this.actionName);
        }

        public void run() {
            LOG.info("Read the local dataset");
            try {
                new File((String) getContext().getRuntimeArguments().get("wait.file")).createNewFile();
                getContext().execute(new TxRunnable() { // from class: co.cask.cdap.test.app.WorkflowAppWithLocalDatasets.LocalDatasetReader.1
                    public void run(DatasetContext datasetContext) throws Exception {
                        int i = 0;
                        CloseableIterator scan = LocalDatasetReader.this.wordCount.scan((byte[]) null, (byte[]) null);
                        Throwable th = null;
                        while (scan.hasNext()) {
                            try {
                                try {
                                    scan.next();
                                    i++;
                                } finally {
                                }
                            } catch (Throwable th2) {
                                if (scan != null) {
                                    if (th != null) {
                                        try {
                                            scan.close();
                                        } catch (Throwable th3) {
                                            th.addSuppressed(th3);
                                        }
                                    } else {
                                        scan.close();
                                    }
                                }
                                throw th2;
                            }
                        }
                        if (scan != null) {
                            if (0 != 0) {
                                try {
                                    scan.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                scan.close();
                            }
                        }
                        LocalDatasetReader.this.result.write("UniqueWordCount", String.valueOf(i));
                        LocalDatasetReader.this.metrics.gauge("unique.words", i);
                    }
                });
                File file = new File((String) getContext().getRuntimeArguments().get("done.file"));
                while (!file.exists()) {
                    TimeUnit.MILLISECONDS.sleep(50L);
                }
            } catch (Exception e) {
                LOG.error("Exception occurred while running custom action ", e);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets$LocalDatasetWriter.class */
    public static class LocalDatasetWriter extends AbstractCustomAction {
        private static final Logger LOG = LoggerFactory.getLogger(LocalDatasetWriter.class);
        private Metrics metrics;

        public void run() {
            try {
                PrintWriter printWriter = new PrintWriter(getContext().getDataset(WorkflowAppWithLocalDatasets.CSV_FILESET_DATASET).getLocation((String) getContext().getRuntimeArguments().get("input.path")).getOutputStream());
                Throwable th = null;
                try {
                    try {
                        printWriter.write("this,text,has");
                        printWriter.println();
                        printWriter.write("two,words,text,inside");
                        this.metrics.gauge("num.lines", 2L);
                        if (printWriter != null) {
                            if (0 != 0) {
                                try {
                                    printWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                printWriter.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                LOG.error("Exception occurred while running custom action ", th4);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets$TokenizerMapper.class */
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable ONE = new IntWritable(1);
        private Text word = new Text();

        public void map(Object obj, Text text, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer stringTokenizer = new StringTokenizer(text.toString());
            while (stringTokenizer.hasMoreTokens()) {
                this.word.set(stringTokenizer.nextToken());
                context.write(this.word, ONE);
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map(obj, (Text) obj2, (Mapper<Object, Text, Text, IntWritable>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets$WordCount.class */
    public static class WordCount extends AbstractMapReduce {
        public void initialize() throws Exception {
            MapReduceContext context = getContext();
            String str = (String) context.getRuntimeArguments().get("output.path");
            HashMap hashMap = new HashMap();
            FileSetArguments.addInputPath(hashMap, str);
            context.addInput(Input.ofDataset(WorkflowAppWithLocalDatasets.CSV_FILESET_DATASET, hashMap));
            context.addOutput(Output.ofDataset(WorkflowAppWithLocalDatasets.WORDCOUNT_DATASET));
            Job job = (Job) context.getHadoopJob();
            job.setMapperClass(TokenizerMapper.class);
            job.setReducerClass(IntSumReducer.class);
            job.setNumReduceTasks(1);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WorkflowAppWithLocalDatasets$WorkflowWithLocalDatasets.class */
    public static class WorkflowWithLocalDatasets extends AbstractWorkflow {
        public void initialize(WorkflowContext workflowContext) throws Exception {
            super.initialize(workflowContext);
            workflowContext.getDataset(WorkflowAppWithLocalDatasets.WORKFLOW_RUNS_DATASET).write(workflowContext.getRunId().getId(), "STARTED");
            workflowContext.getDataset(WorkflowAppWithLocalDatasets.UNIQUE_ID_DATASET).write("id", workflowContext.getRunId().getId());
        }

        public void destroy() {
            if (getContext().getDataset(WorkflowAppWithLocalDatasets.UNIQUE_ID_DATASET).read("id") == null) {
                throw new RuntimeException("Failed to read from local dataset in destroy method.");
            }
            KeyValueTable dataset = getContext().getDataset(WorkflowAppWithLocalDatasets.WORKFLOW_RUNS_DATASET);
            if ("STARTED".equals(Bytes.toString(dataset.read(getContext().getRunId().getId())))) {
                if (getContext().getRuntimeArguments().containsKey("destroy.throw.exception")) {
                    throw new RuntimeException("destroy");
                }
                dataset.write(getContext().getRunId().getId(), "COMPLETED");
            }
        }

        protected void configure() {
            setName(WorkflowAppWithLocalDatasets.WORKFLOW_NAME);
            setDescription("Workflow program with local datasets.");
            createLocalDataset(WorkflowAppWithLocalDatasets.WORDCOUNT_DATASET, KeyValueTable.class);
            createLocalDataset(WorkflowAppWithLocalDatasets.CSV_FILESET_DATASET, FileSet.class, FileSetProperties.builder().setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).build());
            createLocalDataset(WorkflowAppWithLocalDatasets.UNIQUE_ID_DATASET, KeyValueTable.class);
            addAction(new LocalDatasetWriter());
            addSpark("JavaSparkCSVToSpaceConverter");
            addMapReduce("WordCount");
            addAction(new LocalDatasetReader("readerAction"));
        }
    }

    public void configure() {
        setName("WorkflowAppWithLocalDatasets");
        setDescription("App to test the local dataset functionality for the Workflow.");
        addSpark(new SparkCSVToSpaceProgram());
        addMapReduce(new WordCount());
        addWorkflow(new WorkflowWithLocalDatasets());
        createDataset(RESULT_DATASET, KeyValueTable.class);
        createDataset(WORKFLOW_RUNS_DATASET, KeyValueTable.class);
    }
}
