package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.common.Scope;
import co.cask.cdap.api.data.DatasetInstantiationException;
import co.cask.cdap.api.dataset.lib.FileSetArguments;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.TimeseriesTable;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.dataset.table.Get;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.batch.AppWithLocalFiles;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduce;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingFileSet;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingObjectStore;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingRuntimeDatasets;
import co.cask.cdap.internal.app.runtime.batch.AppWithTxAware;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.XSlowTests;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.CharStreams;
import com.google.common.io.Files;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionExecutorFactory;
import org.apache.tephra.TransactionFailureException;
import org.apache.twill.filesystem.Location;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapReduceProgramRunnerTest.class */
public class MapReduceProgramRunnerTest extends MapReduceRunnerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(MapReduceProgramRunnerTest.class);

    @ClassRule
    public static final ExternalResource RESOURCE = new ExternalResource() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.1
        protected void before() throws Throwable {
            System.setProperty("data.tx.timeout", "1");
            System.setProperty("data.tx.cleanup.interval", "2");
        }
    };

    @Test
    public void testTransactionHandling() throws Exception {
        runProgram(deployApp(AppWithTxAware.class), AppWithTxAware.PedanticMapReduce.class, new BasicArguments(ImmutableMap.of("outputPath", ((File) TEMP_FOLDER_SUPPLIER.get()).getPath() + "/output")));
    }

    @Test
    public void testMapreduceWithFileSet() throws Exception {
        System.setProperty(AppWithMapReduceUsingAvroDynamicPartitioner.INPUT_DATASET, "numbers");
        System.setProperty(AppWithMapReduceUsingAvroDynamicPartitioner.OUTPUT_DATASET, "sums");
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        FileSetArguments.setInputPaths(newHashMap2, "abc, xyz");
        HashMap newHashMap3 = Maps.newHashMap();
        FileSetArguments.setOutputPath(newHashMap3, "a001");
        newHashMap.putAll(RuntimeArguments.addScope(Scope.DATASET, "numbers", newHashMap2));
        newHashMap.putAll(RuntimeArguments.addScope(Scope.DATASET, "sums", newHashMap3));
        testMapreduceWithFile("numbers", "abc, xyz", "sums", "a001", AppWithMapReduceUsingFileSet.class, AppWithMapReduceUsingFileSet.ComputeSum.class, new BasicArguments(newHashMap), null, null);
        System.setProperty(AppWithMapReduceUsingAvroDynamicPartitioner.INPUT_DATASET, "boogie");
        System.setProperty(AppWithMapReduceUsingAvroDynamicPartitioner.OUTPUT_DATASET, "boogie");
        HashMap newHashMap4 = Maps.newHashMap();
        HashMap newHashMap5 = Maps.newHashMap();
        FileSetArguments.setInputPaths(newHashMap5, "zzz");
        HashMap newHashMap6 = Maps.newHashMap();
        FileSetArguments.setOutputPath(newHashMap6, "f123");
        newHashMap6.put("output.properties." + TextOutputFormat.SEPERATOR, "#");
        newHashMap4.putAll(RuntimeArguments.addScope(Scope.DATASET, "boogie", newHashMap5));
        newHashMap4.putAll(RuntimeArguments.addScope(Scope.DATASET, "boogie", newHashMap6));
        testMapreduceWithFile("boogie", "zzz", "boogie", "f123", AppWithMapReduceUsingFileSet.class, AppWithMapReduceUsingFileSet.ComputeSum.class, new BasicArguments(newHashMap4), null, "#");
    }

    @Test
    public void testMapreduceWithDynamicDatasets() throws Exception {
        DatasetId dataset = DefaultId.NAMESPACE.dataset("rtInput1");
        DatasetId dataset2 = DefaultId.NAMESPACE.dataset("rtInput2");
        DatasetId dataset3 = DefaultId.NAMESPACE.dataset("rtOutput1");
        dsFramework.addInstance("fileSet", dataset, FileSetProperties.builder().setBasePath("rtInput1").setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
        dsFramework.addInstance("fileSet", dataset3, FileSetProperties.builder().setBasePath("rtOutput1").setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("mr.job.conf.mapreduce.local.map.tasks.maximum", "1");
        newHashMap.put(AppWithMapReduceUsingRuntimeDatasets.INPUT_NAME, "rtInput1");
        newHashMap.put(AppWithMapReduceUsingRuntimeDatasets.INPUT_PATHS, "abc, xyz");
        newHashMap.put(AppWithMapReduceUsingRuntimeDatasets.OUTPUT_NAME, "rtOutput1");
        newHashMap.put(AppWithMapReduceUsingRuntimeDatasets.OUTPUT_PATH, "a001");
        testMapreduceWithFile("rtInput1", "abc, xyz", "rtOutput1", "a001", AppWithMapReduceUsingRuntimeDatasets.class, AppWithMapReduceUsingRuntimeDatasets.ComputeSum.class, new BasicArguments(newHashMap), AppWithMapReduceUsingRuntimeDatasets.COUNTERS, null);
        Collection query = metricStore.query(new MetricDataQuery(0L, System.currentTimeMillis() / 1000, Integer.MAX_VALUE, "system.dataset.store.ops", AggregationFunction.SUM, ImmutableMap.of("ns", DefaultId.NAMESPACE.getNamespace(), "app", AppWithMapReduceUsingRuntimeDatasets.APP_NAME, "mr", AppWithMapReduceUsingRuntimeDatasets.MR_NAME, "ds", "rtt"), Collections.emptyList()));
        Assert.assertEquals(1L, query.size());
        MetricTimeSeries metricTimeSeries = (MetricTimeSeries) query.iterator().next();
        Assert.assertEquals(1L, metricTimeSeries.getTimeValues().size());
        Assert.assertEquals(1L, ((TimeValue) metricTimeSeries.getTimeValues().get(0)).getValue());
        dsFramework.addInstance("fileSet", dataset2, FileSetProperties.builder().setBasePath("rtInput2").setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).setOutputProperty(TextOutputFormat.SEPERATOR, ":").build());
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put(AppWithMapReduceUsingRuntimeDatasets.INPUT_NAME, "rtInput2");
        newHashMap2.put(AppWithMapReduceUsingRuntimeDatasets.INPUT_PATHS, "zzz");
        newHashMap2.put(AppWithMapReduceUsingRuntimeDatasets.OUTPUT_NAME, "rtInput2");
        newHashMap2.put(AppWithMapReduceUsingRuntimeDatasets.OUTPUT_PATH, "f123");
        testMapreduceWithFile("rtInput2", "zzz", "rtInput2", "f123", AppWithMapReduceUsingRuntimeDatasets.class, AppWithMapReduceUsingRuntimeDatasets.ComputeSum.class, new BasicArguments(newHashMap2), AppWithMapReduceUsingRuntimeDatasets.COUNTERS, null);
    }

    private void testMapreduceWithFile(String str, String str2, String str3, String str4, Class cls, Class cls2, Arguments arguments, @Nullable final String str5, @Nullable String str6) throws Exception {
        ApplicationWithPrograms deployApp = deployApp(cls);
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        FileSetArguments.setInputPaths(newHashMap, str2);
        FileSetArguments.setOutputPath(newHashMap2, str4);
        if (str5 != null) {
            Transactions.execute(datasetCache.newTransactionContext(), "countersVerify", new Runnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.2
                @Override // java.lang.Runnable
                public void run() {
                    KeyValueTable dataset = MapReduceRunnerTestBase.datasetCache.getDataset(str5);
                    dataset.delete(AppWithMapReduceUsingRuntimeDatasets.INPUT_RECORDS);
                    dataset.delete(AppWithMapReduceUsingRuntimeDatasets.REDUCE_KEYS);
                }
            });
        }
        long[] jArr = {15, 17, 7, 3};
        long j = 0;
        long j2 = 1;
        long j3 = 0;
        Iterator it = datasetCache.getDataset(str, newHashMap).getInputLocations().iterator();
        while (it.hasNext()) {
            PrintWriter printWriter = new PrintWriter(((Location) it.next()).getOutputStream());
            for (long j4 : jArr) {
                long j5 = j4 * j2;
                printWriter.println(j5);
                j += j5;
                j3++;
            }
            printWriter.close();
            j2++;
        }
        runProgram(deployApp, cls2, arguments);
        Location outputLocation = datasetCache.getDataset(str3, newHashMap2).getOutputLocation();
        if (outputLocation.isDirectory()) {
            Iterator it2 = outputLocation.list().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                Location location = (Location) it2.next();
                if (!location.isDirectory() && location.getName().startsWith("part")) {
                    outputLocation = location;
                    break;
                }
            }
        }
        Assert.assertFalse(outputLocation.isDirectory());
        String readFirstLine = CharStreams.readFirstLine(CharStreams.newReaderSupplier(Locations.newInputSupplier(outputLocation), Charsets.UTF_8));
        Assert.assertNotNull(readFirstLine);
        String[] split = readFirstLine.split(str6 == null ? ":" : str6);
        Assert.assertEquals(2L, split.length);
        Assert.assertEquals("x", split[0]);
        Assert.assertEquals(j, Long.parseLong(split[1]));
        if (str5 != null) {
            final long j6 = j3;
            Transactions.execute(datasetCache.newTransactionContext(), "countersVerify", new Runnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.3
                @Override // java.lang.Runnable
                public void run() {
                    KeyValueTable dataset = MapReduceRunnerTestBase.datasetCache.getDataset(str5);
                    Assert.assertEquals(j6, dataset.incrementAndGet(AppWithMapReduceUsingRuntimeDatasets.INPUT_RECORDS, 0L));
                    Assert.assertEquals(1L, dataset.incrementAndGet(AppWithMapReduceUsingRuntimeDatasets.REDUCE_KEYS, 0L));
                }
            });
        }
    }

    @Test
    public void testMapReduceDriverResources() throws Exception {
        Assert.assertEquals(1024L, ((MapReduceSpecification) deployApp(AppWithMapReduce.class).getSpecification().getMapReduce().get(AppWithMapReduce.ClassicWordCount.class.getSimpleName())).getDriverResources().getMemoryMB());
    }

    @Test
    public void testMapreduceWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApp = deployApp(new NamespaceId("someOtherNameSpace").toId(), AppWithMapReduceUsingObjectStore.class);
        final TransactionAware transactionAware = (ObjectStore) datasetCache.getDataset("someOtherNameSpace", "keys");
        try {
            datasetCache.getDataset("nonExistingNameSpace", "keys");
            Assert.fail("getDataset() should throw an exception when accessing dataset from a non-existing namespace.");
        } catch (DatasetInstantiationException e) {
            Assert.assertTrue(e.getMessage().equals("Could not instantiate dataset 'nonExistingNameSpace:keys'"));
        }
        Transactions.createTransactionExecutor(txExecutorFactory, transactionAware).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.4
            public void apply() {
                transactionAware.write(Bytes.toBytes("persisted data"), "persisted data");
                transactionAware.write(Bytes.toBytes("distributed systems"), "distributed systems");
            }
        });
        runProgram(deployApp, AppWithMapReduceUsingObjectStore.ComputeCounts.class, false, true);
        final KeyValueTable dataset = datasetCache.getDataset("someOtherNameSpace", "count");
        Transactions.createTransactionExecutor(txExecutorFactory, dataset).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.5
            public void apply() {
                byte[] read = dataset.read(Bytes.toBytes("persisted data"));
                Assert.assertTrue(read != null);
                Assert.assertEquals(Bytes.toString(read), Integer.toString("persisted data".length()));
                byte[] read2 = dataset.read(Bytes.toBytes("distributed systems"));
                Assert.assertTrue(read2 != null);
                Assert.assertEquals(Bytes.toString(read2), "19");
            }
        });
    }

    @Test
    public void testWordCount() throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithMapReduce.class);
        final String createInput = createInput();
        final File file = new File(TEMP_FOLDER.newFolder(), "output");
        try {
            datasetCache.getDataset("someOtherNameSpace", "jobConfig");
            Assert.fail("getDataset() should throw an exception when accessing a non-existing dataset.");
        } catch (DatasetInstantiationException e) {
            Assert.assertTrue(e.getMessage().equals("Could not instantiate dataset 'someOtherNameSpace:jobConfig'"));
        }
        final KeyValueTable dataset = datasetCache.getDataset(NamespaceId.DEFAULT.getNamespace(), "jobConfig");
        Transactions.createTransactionExecutor(txExecutorFactory, dataset).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.6
            public void apply() {
                dataset.write(Bytes.toBytes("inputPath"), Bytes.toBytes(createInput));
                dataset.write(Bytes.toBytes("outputPath"), Bytes.toBytes(file.getPath()));
            }
        });
        runProgram(deployApp, AppWithMapReduce.ClassicWordCount.class, false, true);
        Assert.assertEquals("true", System.getProperty("partitioner.initialize"));
        Assert.assertEquals("true", System.getProperty("partitioner.destroy"));
        Assert.assertEquals("true", System.getProperty("partitioner.set.conf"));
        Assert.assertEquals("true", System.getProperty("comparator.initialize"));
        Assert.assertEquals("true", System.getProperty("comparator.destroy"));
        Assert.assertEquals("true", System.getProperty("comparator.set.conf"));
        File[] listFiles = file.listFiles(new FilenameFilter() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.7
            @Override // java.io.FilenameFilter
            public boolean accept(File file2, String str) {
                return str.startsWith("part-r-") && !str.endsWith(".crc");
            }
        });
        Assert.assertNotNull("no output files found", listFiles);
        int i = 0;
        for (File file2 : listFiles) {
            i += Files.readLines(file2, Charsets.UTF_8).size();
        }
        Assert.assertTrue(i > 0);
    }

    @Test
    public void testJobSuccess() throws Exception {
        testSuccess(false);
    }

    @Test
    public void testJobSuccessWithFrequentFlushing() throws Exception {
        testSuccess(true);
    }

    private void testSuccess(boolean z) throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithMapReduce.class);
        datasetCache.newTransactionContext();
        final TimeseriesTable timeseriesTable = (TimeseriesTable) datasetCache.getDataset("timeSeries");
        final KeyValueTable dataset = datasetCache.getDataset("beforeSubmit");
        final KeyValueTable dataset2 = datasetCache.getDataset("onFinish");
        final Table dataset3 = datasetCache.getDataset("counters");
        final Table dataset4 = datasetCache.getDataset("countersFromContext");
        fillTestInputData(txExecutorFactory, timeseriesTable, false);
        final long currentTimeMillis = System.currentTimeMillis();
        runProgram(deployApp, AppWithMapReduce.AggregateTimeseriesByTag.class, z, true);
        final long currentTimeMillis2 = System.currentTimeMillis();
        Transactions.createTransactionExecutor(txExecutorFactory, datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.8
            /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
            public void apply() {
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put("tag1", 18L);
                newHashMap.put("tag2", 3L);
                newHashMap.put("tag3", 18L);
                Iterator read = timeseriesTable.read(AggregateMetricsByTag.BY_TAGS, currentTimeMillis, currentTimeMillis2, (byte[][]) new byte[0]);
                int i = 0;
                while (read.hasNext()) {
                    TimeseriesTable.Entry entry = (TimeseriesTable.Entry) read.next();
                    Assert.assertEquals(((Long) newHashMap.get(Bytes.toString(entry.getTags()[0]))).longValue(), Bytes.toLong(entry.getValue()));
                    i++;
                }
                Assert.assertEquals(newHashMap.size(), i);
                Assert.assertArrayEquals(Bytes.toBytes("beforeSubmit:done"), dataset.read(Bytes.toBytes("beforeSubmit")));
                Assert.assertArrayEquals(Bytes.toBytes("onFinish:done"), dataset2.read(Bytes.toBytes("onFinish")));
                Assert.assertTrue(dataset3.get(new Get("mapper")).getLong("records", 0L) > 0);
                Assert.assertTrue(dataset3.get(new Get("reducer")).getLong("records", 0L) > 0);
                Assert.assertTrue(dataset4.get(new Get("mapper")).getLong("records", 0L) > 0);
                Assert.assertTrue(dataset4.get(new Get("reducer")).getLong("records", 0L) > 0);
            }
        });
        datasetCache.dismissTransactionContext();
    }

    @Test
    public void testJobFailure() throws Exception {
        testFailure(false);
    }

    @Test
    public void testJobFailureWithFrequentFlushing() throws Exception {
        testFailure(true);
    }

    @Test
    public void testMapReduceWithLocalFiles() throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithLocalFiles.class);
        URI createStopWordsFile = createStopWordsFile();
        final KeyValueTable dataset = datasetCache.getDataset("input");
        Transactions.createTransactionExecutor(txExecutorFactory, dataset).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.9
            public void apply() {
                dataset.write("2324", "a test record");
                dataset.write("43353", "the test table");
                dataset.write("34335", "an end record");
            }
        });
        runProgram(deployApp, AppWithLocalFiles.MapReduceWithLocalFiles.class, new BasicArguments(ImmutableMap.of("input", "input", "output", "output", AppWithLocalFiles.STOPWORDS_FILE_ARG, createStopWordsFile.toString())));
        final KeyValueTable dataset2 = datasetCache.getDataset("output");
        Transactions.createTransactionExecutor(txExecutorFactory, dataset2).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.10
            public void apply() {
                Assert.assertNull(dataset2.read("a"));
                Assert.assertNull(dataset2.read("the"));
                Assert.assertNull(dataset2.read("an"));
                Assert.assertEquals(2L, Bytes.toInt(dataset2.read("test")));
                Assert.assertEquals(2L, Bytes.toInt(dataset2.read("record")));
                Assert.assertEquals(1L, Bytes.toInt(dataset2.read("table")));
                Assert.assertEquals(1L, Bytes.toInt(dataset2.read("end")));
            }
        });
    }

    private URI createStopWordsFile() throws IOException {
        File newFile = TEMP_FOLDER.newFile("stopWords.txt");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new FileOutputStream(newFile));
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write("the\n");
                outputStreamWriter.write("a\n");
                outputStreamWriter.write("an");
                if (outputStreamWriter != null) {
                    if (0 != 0) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        outputStreamWriter.close();
                    }
                }
                return newFile.toURI();
            } finally {
            }
        } catch (Throwable th3) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th3;
        }
    }

    private void testFailure(boolean z) throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithMapReduce.class);
        datasetCache.newTransactionContext();
        final TimeseriesTable timeseriesTable = (TimeseriesTable) datasetCache.getDataset("timeSeries");
        final KeyValueTable dataset = datasetCache.getDataset("beforeSubmit");
        final KeyValueTable dataset2 = datasetCache.getDataset("onFinish");
        final Table dataset3 = datasetCache.getDataset("counters");
        final Table dataset4 = datasetCache.getDataset("countersFromContext");
        fillTestInputData(txExecutorFactory, timeseriesTable, true);
        final long currentTimeMillis = System.currentTimeMillis();
        runProgram(deployApp, AppWithMapReduce.AggregateTimeseriesByTag.class, z, false);
        final long currentTimeMillis2 = System.currentTimeMillis();
        Transactions.createTransactionExecutor(txExecutorFactory, datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.11
            /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
            public void apply() {
                Assert.assertFalse(timeseriesTable.read(AggregateMetricsByTag.BY_TAGS, currentTimeMillis, currentTimeMillis2, (byte[][]) new byte[0]).hasNext());
                Assert.assertArrayEquals(Bytes.toBytes("beforeSubmit:done"), dataset.read(Bytes.toBytes("beforeSubmit")));
                Assert.assertArrayEquals(Bytes.toBytes("onFinish:done"), dataset2.read(Bytes.toBytes("onFinish")));
                Assert.assertEquals(0L, dataset3.get(new Get("mapper")).getLong("records", 0L));
                Assert.assertEquals(0L, dataset3.get(new Get("reducer")).getLong("records", 0L));
                Assert.assertEquals(0L, dataset4.get(new Get("mapper")).getLong("records", 0L));
                Assert.assertEquals(0L, dataset4.get(new Get("reducer")).getLong("records", 0L));
            }
        });
        datasetCache.dismissTransactionContext();
    }

    private void fillTestInputData(TransactionExecutorFactory transactionExecutorFactory, final TimeseriesTable timeseriesTable, final boolean z) throws TransactionFailureException, InterruptedException {
        Transactions.createTransactionExecutor(transactionExecutorFactory, timeseriesTable).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.12
            public void apply() {
                MapReduceProgramRunnerTest.this.fillTestInputData(timeseriesTable, z);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r6v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r6v3, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r6v5, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r6v7, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r6v9, types: [byte[], byte[][]] */
    public void fillTestInputData(TimeseriesTable timeseriesTable, boolean z) {
        byte[] bytes = Bytes.toBytes("metric");
        byte[] bytes2 = Bytes.toBytes("metric2");
        byte[] bytes3 = Bytes.toBytes("tag1");
        byte[] bytes4 = Bytes.toBytes("tag2");
        byte[] bytes5 = Bytes.toBytes("tag3");
        timeseriesTable.write(new TimeseriesTable.Entry(bytes, Bytes.toBytes(3L), 1L, (byte[][]) new byte[]{bytes5, bytes4, bytes3}));
        timeseriesTable.write(new TimeseriesTable.Entry(bytes, Bytes.toBytes(10L), 2L, (byte[][]) new byte[]{bytes4, bytes5}));
        timeseriesTable.write(new TimeseriesTable.Entry(bytes, Bytes.toBytes(z ? 55L : 15L), 3L, (byte[][]) new byte[]{bytes3, bytes5}));
        timeseriesTable.write(new TimeseriesTable.Entry(bytes, Bytes.toBytes(23L), 4L, (byte[][]) new byte[]{bytes4}));
        timeseriesTable.write(new TimeseriesTable.Entry(bytes2, Bytes.toBytes(4L), 3L, (byte[][]) new byte[]{bytes3, bytes5}));
    }

    @Test
    public void testFailureInInit() throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithMapReduce.class);
        testFailureInInit("true", deployApp, AppWithMapReduce.FaiiingMR.class, ImmutableMap.of());
        testFailureInInit("false", deployApp, AppWithMapReduce.FaiiingMR.class, ImmutableMap.of("failInput", "true"));
        testFailureInInit("false", deployApp, AppWithMapReduce.FaiiingMR.class, ImmutableMap.of("failOutput", "true"));
        testFailureInInit("true", deployApp, AppWithMapReduce.ExplicitFaiiingMR.class, ImmutableMap.of());
        testFailureInInit("false", deployApp, AppWithMapReduce.ExplicitFaiiingMR.class, ImmutableMap.of("failInput", "true"));
        testFailureInInit("false", deployApp, AppWithMapReduce.ExplicitFaiiingMR.class, ImmutableMap.of("failOutput", "true"));
    }

    public void testFailureInInit(final String str, ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map) throws Exception {
        datasetCache.newTransactionContext();
        final KeyValueTable dataset = datasetCache.getDataset("recorder");
        Transactions.createTransactionExecutor(txExecutorFactory, datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.13
            public void apply() {
                dataset.write("initialized", "false");
            }
        });
        runProgram(applicationWithPrograms, cls, map, false);
        Transactions.createTransactionExecutor(txExecutorFactory, datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.14
            public void apply() {
                Assert.assertEquals(str, Bytes.toString(dataset.read("initialized")));
            }
        });
        datasetCache.dismissTransactionContext();
    }

    @Test
    public void testFailureInOutputCommitter() throws Exception {
        ApplicationWithPrograms deployApp = deployApp(AppWithMapReduce.class);
        datasetCache.newTransactionContext();
        final KeyValueTable dataset = datasetCache.getDataset("recorder");
        Transactions.createTransactionExecutor(txExecutorFactory, datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.15
            public void apply() {
                dataset.write("initialized", "false");
            }
        });
        runProgram(deployApp, AppWithMapReduce.MapReduceWithFailingOutputCommitter.class, (Map<String, String>) new HashMap(), false);
        Transactions.createTransactionExecutor(txExecutorFactory, datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunnerTest.16
            public void apply() {
                Assert.assertEquals(ProgramStatus.FAILED.name(), Bytes.toString(dataset.read("status")));
            }
        });
        datasetCache.dismissTransactionContext();
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("metric", "metric");
        hashMap.put("startTs", "1");
        hashMap.put("stopTs", "3");
        hashMap.put("tag", "tag1");
        if (z) {
            hashMap.put("frequentFlushing", "true");
        }
        runProgram(applicationWithPrograms, cls, hashMap, z2);
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map, boolean z) throws Exception {
        LOG.info("Starting {} with arguments {}", cls.getName(), map);
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(runProgram(applicationWithPrograms, cls, new BasicArguments(map))));
    }

    private String createInput() throws IOException {
        File newFolder = TEMP_FOLDER.newFolder();
        File file = new File(newFolder.getPath() + "/words.txt");
        file.deleteOnExit();
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("this text has");
                bufferedWriter.newLine();
                bufferedWriter.write("two words text inside");
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                return newFolder.getPath();
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }
}
