package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.common.Scope;
import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetArguments;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionOutput;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.TimePartitionDetail;
import co.cask.cdap.api.dataset.lib.TimePartitionOutput;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.dataset2.SingleThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.TempFolder;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.spark.ScalaSparkAppUsingObjectStore;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingFileSet;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingGetDataset;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingLocalFiles;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingObjectStore;
import co.cask.cdap.proto.DatasetSpecificationSummary;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.XSlowTests;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.inject.Injector;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkProgramRunnerTest.class */
public class SparkProgramRunnerTest {
    private static Injector injector;
    private static TransactionExecutorFactory txExecutorFactory;
    private static TransactionManager txService;
    private static DatasetFramework dsFramework;
    private static DynamicDatasetCache datasetCache;
    private static MetricStore metricStore;
    final String testString1 = "persisted data";
    final String testString2 = "distributed systems";
    private static final TempFolder TEMP_FOLDER = new TempFolder();
    private static final Gson GSON = new Gson();

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m65get() {
            try {
                return SparkProgramRunnerTest.tmpFolder.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    @BeforeClass
    public static void beforeClass() {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder("data").getAbsolutePath());
        create.setInt("data.tx.timeout", 1);
        create.setInt("data.tx.cleanup.interval", 2);
        injector = AppFabricTestHelper.getInjector(create);
        txService = (TransactionManager) injector.getInstance(TransactionManager.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        dsFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        datasetCache = new SingleThreadDatasetCache(new SystemDatasetInstantiator(dsFramework, SparkProgramRunnerTest.class.getClassLoader(), (Iterable) null), (TransactionSystemClient) injector.getInstance(TransactionSystemClient.class), NamespaceId.DEFAULT, DatasetDefinition.NO_ARGUMENTS, (MetricsContext) null, (Map) null);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        txService.startAndWait();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        txService.stopAndWait();
    }

    @After
    public void after() throws Exception {
        Iterator it = dsFramework.getInstances(DefaultId.NAMESPACE).iterator();
        while (it.hasNext()) {
            dsFramework.deleteInstance(Id.DatasetInstance.from(DefaultId.NAMESPACE, ((DatasetSpecificationSummary) it.next()).getName()));
        }
        datasetCache.invalidate();
    }

    @Test
    public void testSparkWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(SparkAppUsingObjectStore.class, TEMP_FOLDER_SUPPLIER);
        prepareInputData();
        runProgram(deployApplicationWithManager, SparkAppUsingObjectStore.CharCountSpecification.class);
        checkOutputData();
        Tasks.waitFor(4L, new Callable<Long>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                Collection query = SparkProgramRunnerTest.metricStore.query(new MetricDataQuery(0L, System.currentTimeMillis() / 1000, 60, "system.dataset.store.ops", AggregationFunction.SUM, ImmutableMap.of("ns", DefaultId.NAMESPACE.getId(), "app", SparkAppUsingObjectStore.APP_NAME, "sp", SparkAppUsingObjectStore.SPARK_NAME, "ds", "totals"), Collections.emptyList()));
                if (query.isEmpty()) {
                    return 0L;
                }
                Assert.assertEquals(1L, query.size());
                MetricTimeSeries metricTimeSeries = (MetricTimeSeries) query.iterator().next();
                Assert.assertEquals(1L, metricTimeSeries.getTimeValues().size());
                return Long.valueOf(((TimeValue) metricTimeSeries.getTimeValues().get(0)).getValue());
            }
        }, 5L, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testScalaSparkWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(ScalaSparkAppUsingObjectStore.class, TEMP_FOLDER_SUPPLIER);
        prepareInputData();
        runProgram(deployApplicationWithManager, ScalaSparkAppUsingObjectStore.CharCountSpecification.class);
        checkOutputData();
    }

    @Test
    public void testSparkWithFileSet() throws Exception {
        testSparkWithFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithFileSet() throws Exception {
        testSparkWithFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        FileSet dataset = datasetCache.getDataset("fs");
        prepareFileInput(dataset.getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        FileSetArguments.setOutputPath(hashMap, "xx");
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "fs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "fs", hashMap2));
        hashMap3.put("input", "fs");
        hashMap3.put("output", "fs");
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        validateFileOutput(dataset.getLocation("xx"));
    }

    @Test
    public void testSparkWithPartitionedFileSet() throws Exception {
        testSparkWithPartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithPartitionedFileSet() throws Exception {
        testSparkWithPartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithPartitionedFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        datasetCache.newTransactionContext();
        final PartitionedFileSet dataset = datasetCache.getDataset("pfs");
        final PartitionOutput partitionOutput = dataset.getPartitionOutput(PartitionKey.builder().addStringField("x", "nn").build());
        prepareFileInput(partitionOutput.getLocation());
        txExecutorFactory.createExecutor(datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.3
            public void apply() throws Exception {
                partitionOutput.addPartition();
            }
        });
        HashMap hashMap = new HashMap();
        PartitionedFileSetArguments.setInputPartitionFilter(hashMap, PartitionFilter.builder().addRangeCondition("x", "na", "nx").build());
        HashMap hashMap2 = new HashMap();
        final PartitionKey build = PartitionKey.builder().addStringField("x", "xx").build();
        PartitionedFileSetArguments.setOutputPartitionKey(hashMap2, build);
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "pfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "pfs", hashMap2));
        hashMap3.put("input", "pfs");
        hashMap3.put("output", "pfs");
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        txExecutorFactory.createExecutor(datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.4
            public void apply() throws Exception {
                PartitionDetail partition = dataset.getPartition(build);
                Assert.assertNotNull(partition);
                SparkProgramRunnerTest.this.validateFileOutput(partition.getLocation());
            }
        });
        datasetCache.dismissTransactionContext();
    }

    @Test
    public void testSparkWithTimePartitionedFileSet() throws Exception {
        testSparkWithTimePartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithTimePartitionedFileSet() throws Exception {
        testSparkWithTimePartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithTimePartitionedFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        datasetCache.newTransactionContext();
        final TimePartitionedFileSet timePartitionedFileSet = (TimePartitionedFileSet) datasetCache.getDataset("tpfs");
        long currentTimeMillis = System.currentTimeMillis();
        final long millis = currentTimeMillis + TimeUnit.HOURS.toMillis(1L);
        addTimePartition(timePartitionedFileSet, currentTimeMillis);
        addTimePartition(timePartitionedFileSet, 987654321L);
        HashMap hashMap = new HashMap();
        TimePartitionedFileSetArguments.setInputStartTime(hashMap, currentTimeMillis - 100);
        TimePartitionedFileSetArguments.setInputEndTime(hashMap, currentTimeMillis + 100);
        HashMap hashMap2 = new HashMap();
        TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap2, millis);
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "tpfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "tpfs", hashMap2));
        hashMap3.put("input", "tpfs");
        hashMap3.put("output", "tpfs");
        hashMap3.put("outputKey", String.valueOf(123456789L));
        hashMap3.put("inputKey", String.valueOf(987654321L));
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        txExecutorFactory.createExecutor(datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.5
            public void apply() throws Exception {
                TimePartitionDetail partitionByTime = timePartitionedFileSet.getPartitionByTime(millis);
                Assert.assertNotNull("Output partition is null while for running without custom dataset arguments", partitionByTime);
                SparkProgramRunnerTest.this.validateFileOutput(partitionByTime.getLocation());
                TimePartitionDetail partitionByTime2 = timePartitionedFileSet.getPartitionByTime(123456789L);
                Assert.assertNotNull("Output partition is null while for running with custom dataset arguments", partitionByTime2);
                SparkProgramRunnerTest.this.validateFileOutput(partitionByTime2.getLocation());
            }
        });
        datasetCache.dismissTransactionContext();
    }

    private void addTimePartition(TimePartitionedFileSet timePartitionedFileSet, long j) throws IOException, TransactionFailureException, InterruptedException {
        final TimePartitionOutput partitionOutput = timePartitionedFileSet.getPartitionOutput(j);
        prepareFileInput(partitionOutput.getLocation());
        txExecutorFactory.createExecutor(datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.6
            public void apply() throws Exception {
                partitionOutput.addPartition();
            }
        });
    }

    @Test
    public void testSparkWithCustomFileSet() throws Exception {
        testSparkWithCustomFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithCustomFileSet() throws Exception {
        testSparkWithCustomFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithCustomFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        SparkAppUsingFileSet.MyFileSet dataset = datasetCache.getDataset("myfs");
        FileSet embeddedFileSet = dataset.getEmbeddedFileSet();
        prepareFileInput(embeddedFileSet.getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        FileSetArguments.setOutputPath(hashMap, "xx");
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "myfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "myfs", hashMap2));
        hashMap3.put("input", "myfs");
        hashMap3.put("output", "myfs");
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        validateFileOutput(embeddedFileSet.getLocation("xx"));
        Assert.assertTrue(dataset.getSuccessLocation().exists());
        Assert.assertFalse(dataset.getFailureLocation().exists());
        dataset.getSuccessLocation().delete();
        expectProgramError(deployApplicationWithManager, cls2, hashMap3, FileAlreadyExistsException.class);
        Assert.assertFalse(dataset.getSuccessLocation().exists());
        Assert.assertTrue(dataset.getFailureLocation().exists());
        dataset.getSuccessLocation().delete();
    }

    @Test
    public void testJavaSparkWithGetDataset() throws Exception {
        testSparkWithGetDataset(SparkAppUsingGetDataset.class, SparkAppUsingGetDataset.JavaSparkLogParser.class);
    }

    @Test
    public void testScalaSparkWithGetDataset() throws Exception {
        testSparkWithGetDataset(SparkAppUsingGetDataset.class, SparkAppUsingGetDataset.ScalaSparkLogParser.class);
    }

    private void testSparkWithGetDataset(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        prepareInputFileSetWithLogData(datasetCache.getDataset("logs").getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(RuntimeArguments.addScope(Scope.DATASET, "logs", hashMap));
        hashMap2.put("input", "logs");
        hashMap2.put("output", "logStats");
        runProgram(deployApplicationWithManager, cls2, hashMap2);
        validateGetDatasetOutput((KeyValueTable) datasetCache.getDataset("logStats"));
    }

    @Test
    public void testJavaSparkWithLocalFiles() throws Exception {
        testSparkWithLocalFiles(SparkAppUsingLocalFiles.class, SparkAppUsingLocalFiles.JavaSparkUsingLocalFiles.class, "java");
    }

    @Test
    public void testScalaSparkWithLocalFiles() throws Exception {
        testSparkWithLocalFiles(SparkAppUsingLocalFiles.class, SparkAppUsingLocalFiles.ScalaSparkUsingLocalFiles.class, "scala");
    }

    private void testSparkWithLocalFiles(Class<?> cls, Class<?> cls2, String str) throws Exception {
        runProgram(AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER), cls2, ImmutableMap.of("local.file", createLocalPropertiesFile(str).toString()));
        KeyValueTable dataset = datasetCache.getDataset("output");
        ImmutableMap of = ImmutableMap.of("a", "1", "b", "2", "c", "3");
        CloseableIterator scan = dataset.scan((byte[]) null, (byte[]) null);
        for (int i = 0; i < 3; i++) {
            try {
                KeyValue keyValue = (KeyValue) scan.next();
                Assert.assertEquals(of.get(Bytes.toString((byte[]) keyValue.getKey())), Bytes.toString((byte[]) keyValue.getValue()));
            } catch (Throwable th) {
                scan.close();
                throw th;
            }
        }
        Assert.assertFalse(scan.hasNext());
        scan.close();
    }

    private URI createLocalPropertiesFile(String str) throws IOException {
        File newFile = TEMP_FOLDER.newFile(str + "-local.properties");
        FileOutputStream fileOutputStream = new FileOutputStream(newFile);
        Throwable th = null;
        try {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileOutputStream);
            Throwable th2 = null;
            try {
                try {
                    outputStreamWriter.write("a=1\n");
                    outputStreamWriter.write("b = 2\n");
                    outputStreamWriter.write("c= 3");
                    if (outputStreamWriter != null) {
                        if (0 != 0) {
                            try {
                                outputStreamWriter.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            outputStreamWriter.close();
                        }
                    }
                    return newFile.toURI();
                } finally {
                }
            } catch (Throwable th4) {
                if (outputStreamWriter != null) {
                    if (th2 != null) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        outputStreamWriter.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
        }
    }

    private void validateGetDatasetOutput(KeyValueTable keyValueTable) {
        ImmutableMap of = ImmutableMap.of(new SparkAppUsingGetDataset.LogKey("10.10.10.10", "FRED", "GET http://bar.com/image.jpg HTTP/1.1", 200), new SparkAppUsingGetDataset.LogStats(2, 100), new SparkAppUsingGetDataset.LogKey("10.10.10.10", "FRED", "GET http://bar.com/image.jpg HTTP/1.1", 404), new SparkAppUsingGetDataset.LogStats(1, 50), new SparkAppUsingGetDataset.LogKey("20.20.20.20", "BRAD", "GET http://bar.com/image.jpg HTTP/1.1", 200), new SparkAppUsingGetDataset.LogStats(1, 50), new SparkAppUsingGetDataset.LogKey("20.20.20.20", "BRAD", "GET http://bar.com/image.jpg HTTP/1.1", 404), new SparkAppUsingGetDataset.LogStats(1, 50));
        CloseableIterator scan = keyValueTable.scan((byte[]) null, (byte[]) null);
        for (int i = 0; i < 4; i++) {
            try {
                Assert.assertTrue("Expected next for i = " + i, scan.hasNext());
                KeyValue keyValue = (KeyValue) scan.next();
                Assert.assertEquals(of.get((SparkAppUsingGetDataset.LogKey) GSON.fromJson(Bytes.toString((byte[]) keyValue.getKey()), SparkAppUsingGetDataset.LogKey.class)), (SparkAppUsingGetDataset.LogStats) GSON.fromJson(Bytes.toString((byte[]) keyValue.getValue()), SparkAppUsingGetDataset.LogStats.class));
            } catch (Throwable th) {
                scan.close();
                throw th;
            }
        }
        Assert.assertFalse(scan.hasNext());
        scan.close();
    }

    private void prepareInputFileSetWithLogData(Location location) throws IOException {
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(location.getOutputStream());
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write("10.10.10.10 - FRED [18/Jan/2013:17:56:07 +1100] \"GET http://bar.com/image.jpg HTTP/1.1\" 200 50 \"http://foo.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR 3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" 62.24.11.25 images.com 1358492167 - Whatup\n");
                outputStreamWriter.write("20.20.20.20 - BRAD [18/Jan/2013:17:56:07 +1100] \"GET http://bar.com/image.jpg HTTP/1.1\" 200 50 \"http://foo.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR 3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" 62.24.11.25 images.com 1358492167 - Whatup\n");
                outputStreamWriter.write("10.10.10.10 - FRED [18/Jan/2013:17:56:07 +1100] \"GET http://bar.com/image.jpg HTTP/1.1\" 404 50 \"http://foo.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR 3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" 62.24.11.25 images.com 1358492167 - Whatup\n");
                outputStreamWriter.write("10.10.10.10 - FRED [18/Jan/2013:17:56:07 +1100] \"GET http://bar.com/image.jpg HTTP/1.1\" 200 50 \"http://foo.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR 3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" 62.24.11.25 images.com 1358492167 - Whatup\n");
                outputStreamWriter.write("20.20.20.20 - BRAD [18/Jan/2013:17:56:07 +1100] \"GET http://bar.com/image.jpg HTTP/1.1\" 404 50 \"http://foo.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR 3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" 62.24.11.25 images.com 1358492167 - Whatup\n");
                if (outputStreamWriter != null) {
                    if (0 == 0) {
                        outputStreamWriter.close();
                        return;
                    }
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th4;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateFileOutput(Location location) throws Exception {
        Assert.assertTrue(location.isDirectory());
        for (Location location2 : location.list()) {
            if (location2.getName().startsWith("part-r-")) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(location2.getInputStream()));
                Throwable th = null;
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        Assert.assertNotNull(readLine);
                        Assert.assertEquals("13 characters:13", readLine);
                        String readLine2 = bufferedReader.readLine();
                        Assert.assertNotNull(readLine2);
                        Assert.assertEquals("7 chars:7", readLine2);
                        Assert.assertNull(bufferedReader.readLine());
                        if (bufferedReader != null) {
                            if (0 == 0) {
                                bufferedReader.close();
                                return;
                            }
                            try {
                                bufferedReader.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (bufferedReader != null) {
                        if (th != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th4;
                }
            }
        }
        Assert.fail("Output directory does not contain any part file: " + location.list());
    }

    private void prepareFileInput(Location location) throws IOException {
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(location.getOutputStream());
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write("13 characters\n");
                outputStreamWriter.write("7 chars\n");
                if (outputStreamWriter != null) {
                    if (0 == 0) {
                        outputStreamWriter.close();
                        return;
                    }
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th4;
        }
    }

    private void prepareInputData() throws TransactionFailureException, InterruptedException {
        txExecutorFactory.createExecutor(datasetCache).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.7
            public void apply() {
                ObjectStore dataset = SparkProgramRunnerTest.datasetCache.getDataset("keys");
                dataset.write(Bytes.toBytes("persisted data"), "persisted data");
                dataset.write(Bytes.toBytes("distributed systems"), "distributed systems");
            }
        });
    }

    private void checkOutputData() throws TransactionFailureException, InterruptedException {
        final KeyValueTable dataset = datasetCache.getDataset("count");
        txExecutorFactory.createExecutor(datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.8
            public void apply() {
                Assert.assertTrue(dataset.read(Bytes.toBytes("persisted data")) != null);
                Assert.assertEquals(Bytes.toInt(r0), "persisted data".length());
                Assert.assertTrue(dataset.read(Bytes.toBytes("distributed systems")) != null);
                Assert.assertEquals(Bytes.toInt(r0), "distributed systems".length());
            }
        });
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws Exception {
        runProgram(applicationWithPrograms, cls, RuntimeArguments.NO_ARGUMENTS);
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map) throws Exception {
        waitForCompletion(submit(applicationWithPrograms, cls, map));
    }

    private void expectProgramError(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map, Class<? extends Throwable> cls2) throws Exception {
        runProgram(applicationWithPrograms, cls, map);
    }

    private Throwable waitForCompletion(ProgramController programController) throws InterruptedException {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        programController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.9
            public void completed() {
                countDownLatch.countDown();
            }

            public void error(Throwable th) {
                countDownLatch.countDown();
                atomicReference.set(th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        countDownLatch.await(10L, TimeUnit.MINUTES);
        return (Throwable) atomicReference.get();
    }

    private ProgramController submit(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map) throws ClassNotFoundException {
        ProgramRunnerFactory programRunnerFactory = (ProgramRunnerFactory) injector.getInstance(ProgramRunnerFactory.class);
        Program program = getProgram(applicationWithPrograms, cls);
        Assert.assertNotNull(program);
        return programRunnerFactory.create(ProgramRunnerFactory.Type.valueOf(program.getType().name())).run(program, new SimpleProgramOptions(program.getName(), new BasicArguments(ImmutableMap.of("runId", RunIds.generate().getId())), new BasicArguments(map)));
    }

    private Program getProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws ClassNotFoundException {
        for (Program program : applicationWithPrograms.getPrograms()) {
            if (cls.getCanonicalName().equals(program.getMainClass().getCanonicalName())) {
                return program;
            }
        }
        return null;
    }
}
