package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.common.Scope;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetArguments;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionOutput;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.TimePartitionDetail;
import co.cask.cdap.api.dataset.lib.TimePartitionOutput;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.dataset.DatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.TempFolder;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.batch.AppWithTimePartitionedFileSet;
import co.cask.cdap.internal.app.runtime.spark.ScalaSparkAppUsingObjectStore;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingFileSet;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingObjectStore;
import co.cask.cdap.proto.DatasetSpecificationSummary;
import co.cask.cdap.proto.Id;
import co.cask.cdap.test.XSlowTests;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionManager;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkProgramRunnerTest.class */
public class SparkProgramRunnerTest {
    private static Injector injector;
    private static TransactionExecutorFactory txExecutorFactory;
    private static TransactionManager txService;
    private static DatasetFramework dsFramework;
    private static DatasetInstantiator datasetInstantiator;
    private static MetricStore metricStore;
    final String testString1 = "persisted data";
    final String testString2 = "distributed systems";
    private static final TempFolder TEMP_FOLDER = new TempFolder();

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m55get() {
            try {
                return SparkProgramRunnerTest.tmpFolder.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    @BeforeClass
    public static void beforeClass() {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder("data").getAbsolutePath());
        create.setInt("data.tx.timeout", 1);
        create.setInt("data.tx.cleanup.interval", 2);
        injector = AppFabricTestHelper.getInjector(create);
        txService = (TransactionManager) injector.getInstance(TransactionManager.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        dsFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        datasetInstantiator = new DatasetInstantiator(DefaultId.NAMESPACE, dsFramework, SparkProgramRunnerTest.class.getClassLoader(), (Iterable) null, (MetricsContext) null);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        txService.startAndWait();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        txService.stopAndWait();
    }

    @After
    public void after() throws Exception {
        Iterator it = dsFramework.getInstances(DefaultId.NAMESPACE).iterator();
        while (it.hasNext()) {
            dsFramework.deleteInstance(Id.DatasetInstance.from(DefaultId.NAMESPACE, ((DatasetSpecificationSummary) it.next()).getName()));
        }
    }

    @Test
    public void testSparkWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(SparkAppUsingObjectStore.class, TEMP_FOLDER_SUPPLIER);
        prepareInputData();
        runProgram(deployApplicationWithManager, SparkAppUsingObjectStore.CharCountSpecification.class);
        checkOutputData();
        Collection query = metricStore.query(new MetricDataQuery(0L, System.currentTimeMillis() / 1000, 60, "system.dataset.store.ops", AggregationFunction.SUM, ImmutableMap.of("ns", DefaultId.NAMESPACE.getId(), "app", SparkAppUsingObjectStore.APP_NAME, "sp", SparkAppUsingObjectStore.SPARK_NAME, "ds", "totals"), Collections.emptyList()));
        Assert.assertEquals(1L, query.size());
        MetricTimeSeries metricTimeSeries = (MetricTimeSeries) query.iterator().next();
        Assert.assertEquals(1L, metricTimeSeries.getTimeValues().size());
        Assert.assertEquals(1L, ((TimeValue) metricTimeSeries.getTimeValues().get(0)).getValue());
    }

    @Test
    public void testScalaSparkWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(ScalaSparkAppUsingObjectStore.class, TEMP_FOLDER_SUPPLIER);
        prepareInputData();
        runProgram(deployApplicationWithManager, ScalaSparkAppUsingObjectStore.CharCountSpecification.class);
        checkOutputData();
    }

    @Test
    public void testSparkWithFileSet() throws Exception {
        testSparkWithFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithFileSet() throws Exception {
        testSparkWithFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        FileSet dataset = datasetInstantiator.getDataset("fs");
        prepareFileInput(dataset.getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        FileSetArguments.setOutputPath(hashMap, "xx");
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "fs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "fs", hashMap2));
        hashMap3.put(AppWithTimePartitionedFileSet.INPUT, "fs");
        hashMap3.put(AppWithTimePartitionedFileSet.OUTPUT, "fs");
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        validateFileOutput(dataset.getLocation("xx"));
    }

    @Test
    public void testSparkWithPartitionedFileSet() throws Exception {
        testSparkWithPartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithPartitionedFileSet() throws Exception {
        testSparkWithPartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithPartitionedFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        final PartitionedFileSet dataset = datasetInstantiator.getDataset("pfs");
        final PartitionOutput partitionOutput = dataset.getPartitionOutput(PartitionKey.builder().addStringField("x", "nn").build());
        prepareFileInput(partitionOutput.getLocation());
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.2
            public void apply() throws Exception {
                partitionOutput.addPartition();
            }
        });
        HashMap hashMap = new HashMap();
        PartitionedFileSetArguments.setInputPartitionFilter(hashMap, PartitionFilter.builder().addRangeCondition("x", "na", "nx").build());
        HashMap hashMap2 = new HashMap();
        final PartitionKey build = PartitionKey.builder().addStringField("x", "xx").build();
        PartitionedFileSetArguments.setOutputPartitionKey(hashMap2, build);
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "pfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "pfs", hashMap2));
        hashMap3.put(AppWithTimePartitionedFileSet.INPUT, "pfs");
        hashMap3.put(AppWithTimePartitionedFileSet.OUTPUT, "pfs");
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.3
            public void apply() throws Exception {
                PartitionDetail partition = dataset.getPartition(build);
                Assert.assertNotNull(partition);
                SparkProgramRunnerTest.this.validateFileOutput(partition.getLocation());
            }
        });
    }

    @Test
    public void testSparkWithTimePartitionedFileSet() throws Exception {
        testSparkWithTimePartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithTimePartitionedFileSet() throws Exception {
        testSparkWithTimePartitionedFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithTimePartitionedFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        final TimePartitionedFileSet timePartitionedFileSet = (TimePartitionedFileSet) datasetInstantiator.getDataset("tpfs");
        long currentTimeMillis = System.currentTimeMillis();
        final long millis = currentTimeMillis + TimeUnit.HOURS.toMillis(1L);
        addTimePartition(timePartitionedFileSet, currentTimeMillis);
        addTimePartition(timePartitionedFileSet, 987654321L);
        HashMap hashMap = new HashMap();
        TimePartitionedFileSetArguments.setInputStartTime(hashMap, currentTimeMillis - 100);
        TimePartitionedFileSetArguments.setInputEndTime(hashMap, currentTimeMillis + 100);
        HashMap hashMap2 = new HashMap();
        TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap2, millis);
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "tpfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "tpfs", hashMap2));
        hashMap3.put(AppWithTimePartitionedFileSet.INPUT, "tpfs");
        hashMap3.put(AppWithTimePartitionedFileSet.OUTPUT, "tpfs");
        hashMap3.put("outputKey", String.valueOf(123456789L));
        hashMap3.put("inputKey", String.valueOf(987654321L));
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.4
            public void apply() throws Exception {
                TimePartitionDetail partitionByTime = timePartitionedFileSet.getPartitionByTime(millis);
                Assert.assertNotNull("Output partition is null while for running without custom dataset arguments", partitionByTime);
                SparkProgramRunnerTest.this.validateFileOutput(partitionByTime.getLocation());
                TimePartitionDetail partitionByTime2 = timePartitionedFileSet.getPartitionByTime(123456789L);
                Assert.assertNotNull("Output partition is null while for running with custom dataset arguments", partitionByTime2);
                SparkProgramRunnerTest.this.validateFileOutput(partitionByTime2.getLocation());
            }
        });
    }

    private void addTimePartition(TimePartitionedFileSet timePartitionedFileSet, long j) throws IOException, TransactionFailureException, InterruptedException {
        final TimePartitionOutput partitionOutput = timePartitionedFileSet.getPartitionOutput(j);
        prepareFileInput(partitionOutput.getLocation());
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.5
            public void apply() throws Exception {
                partitionOutput.addPartition();
            }
        });
    }

    @Test
    public void testSparkWithCustomFileSet() throws Exception {
        testSparkWithCustomFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.JavaCharCount.class);
    }

    @Test
    public void testSparkScalaWithCustomFileSet() throws Exception {
        testSparkWithCustomFileSet(SparkAppUsingFileSet.class, SparkAppUsingFileSet.ScalaCharCount.class);
    }

    private void testSparkWithCustomFileSet(Class<?> cls, Class<?> cls2) throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(cls, TEMP_FOLDER_SUPPLIER);
        SparkAppUsingFileSet.MyFileSet dataset = datasetInstantiator.getDataset("myfs");
        FileSet embeddedFileSet = dataset.getEmbeddedFileSet();
        prepareFileInput(embeddedFileSet.getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        FileSetArguments.setOutputPath(hashMap, "xx");
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "myfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "myfs", hashMap2));
        hashMap3.put(AppWithTimePartitionedFileSet.INPUT, "myfs");
        hashMap3.put(AppWithTimePartitionedFileSet.OUTPUT, "myfs");
        runProgram(deployApplicationWithManager, cls2, hashMap3);
        validateFileOutput(embeddedFileSet.getLocation("xx"));
        Assert.assertTrue(dataset.getSuccessLocation().exists());
        Assert.assertFalse(dataset.getFailureLocation().exists());
        dataset.getSuccessLocation().delete();
        expectProgramError(deployApplicationWithManager, cls2, hashMap3, FileAlreadyExistsException.class);
        Assert.assertFalse(dataset.getSuccessLocation().exists());
        Assert.assertTrue(dataset.getFailureLocation().exists());
        dataset.getSuccessLocation().delete();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateFileOutput(Location location) throws Exception {
        Assert.assertTrue(location.isDirectory());
        for (Location location2 : location.list()) {
            if (location2.getName().startsWith("part-r-")) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(location2.getInputStream()));
                Throwable th = null;
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        Assert.assertNotNull(readLine);
                        Assert.assertEquals("13 characters:13", readLine);
                        String readLine2 = bufferedReader.readLine();
                        Assert.assertNotNull(readLine2);
                        Assert.assertEquals("7 chars:7", readLine2);
                        Assert.assertNull(bufferedReader.readLine());
                        if (bufferedReader != null) {
                            if (0 == 0) {
                                bufferedReader.close();
                                return;
                            }
                            try {
                                bufferedReader.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (bufferedReader != null) {
                        if (th != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th4;
                }
            }
        }
        Assert.fail("Output directory does not contain any part file: " + location.list());
    }

    private void prepareFileInput(Location location) throws IOException {
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(location.getOutputStream());
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write("13 characters\n");
                outputStreamWriter.write("7 chars\n");
                if (outputStreamWriter != null) {
                    if (0 == 0) {
                        outputStreamWriter.close();
                        return;
                    }
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th4;
        }
    }

    private void prepareInputData() throws TransactionFailureException, InterruptedException {
        final ObjectStore dataset = datasetInstantiator.getDataset("keys");
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.6
            public void apply() {
                dataset.write(Bytes.toBytes("persisted data"), "persisted data");
                dataset.write(Bytes.toBytes("distributed systems"), "distributed systems");
            }
        });
    }

    private void checkOutputData() throws TransactionFailureException, InterruptedException {
        final KeyValueTable dataset = datasetInstantiator.getDataset("count");
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.7
            public void apply() {
                Assert.assertTrue(dataset.read(Bytes.toBytes("persisted data")) != null);
                Assert.assertEquals(Bytes.toInt(r0), "persisted data".length());
                Assert.assertTrue(dataset.read(Bytes.toBytes("distributed systems")) != null);
                Assert.assertEquals(Bytes.toInt(r0), "distributed systems".length());
            }
        });
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws Exception {
        runProgram(applicationWithPrograms, cls, RuntimeArguments.NO_ARGUMENTS);
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map) throws Exception {
        waitForCompletion(submit(applicationWithPrograms, cls, map));
    }

    private void expectProgramError(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map, Class<? extends Throwable> cls2) throws Exception {
        runProgram(applicationWithPrograms, cls, map);
    }

    private Throwable waitForCompletion(ProgramController programController) throws InterruptedException {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        programController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.8
            public void completed() {
                countDownLatch.countDown();
            }

            public void error(Throwable th) {
                countDownLatch.countDown();
                atomicReference.set(th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        countDownLatch.await(10L, TimeUnit.MINUTES);
        return (Throwable) atomicReference.get();
    }

    private ProgramController submit(ApplicationWithPrograms applicationWithPrograms, Class<?> cls, Map<String, String> map) throws ClassNotFoundException {
        ProgramRunnerFactory programRunnerFactory = (ProgramRunnerFactory) injector.getInstance(ProgramRunnerFactory.class);
        Program program = getProgram(applicationWithPrograms, cls);
        Assert.assertNotNull(program);
        return programRunnerFactory.create(ProgramRunnerFactory.Type.valueOf(program.getType().name())).run(program, new SimpleProgramOptions(program.getName(), new BasicArguments(ImmutableMap.of("runId", RunIds.generate().getId())), new BasicArguments(map)));
    }

    private Program getProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws ClassNotFoundException {
        for (Program program : applicationWithPrograms.getPrograms()) {
            if (cls.getCanonicalName().equals(program.getMainClass().getCanonicalName())) {
                return program;
            }
        }
        return null;
    }
}
