package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetSpecification;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.metrics.MetricsCollector;
import co.cask.cdap.data.Namespace;
import co.cask.cdap.data.dataset.DatasetInstantiator;
import co.cask.cdap.data2.datafabric.DefaultDatasetNamespace;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.NamespacedDatasetFramework;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.spark.ScalaSparkAppUsingObjectStore;
import co.cask.cdap.internal.app.runtime.spark.SparkAppUsingObjectStore;
import co.cask.cdap.test.XSlowTests;
import co.cask.cdap.test.internal.AppFabricTestHelper;
import co.cask.cdap.test.internal.TempFolder;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionManager;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkProgramRunnerTest.class */
public class SparkProgramRunnerTest {
    private static Injector injector;
    private static TransactionExecutorFactory txExecutorFactory;
    private static TransactionManager txService;
    private static DatasetFramework dsFramework;
    private static DatasetInstantiator datasetInstantiator;
    final String testString1 = "persisted data";
    final String testString2 = "distributed systems";
    private static final TempFolder TEMP_FOLDER = new TempFolder();

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m18get() {
            try {
                return SparkProgramRunnerTest.tmpFolder.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    @BeforeClass
    public static void beforeClass() {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder("data").getAbsolutePath());
        create.setInt("data.tx.timeout", 1);
        create.setInt("data.tx.cleanup.interval", 2);
        injector = AppFabricTestHelper.getInjector(create);
        txService = (TransactionManager) injector.getInstance(TransactionManager.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        dsFramework = new NamespacedDatasetFramework((DatasetFramework) injector.getInstance(DatasetFramework.class), new DefaultDatasetNamespace(create, Namespace.USER));
        datasetInstantiator = new DatasetInstantiator((DatasetFramework) injector.getInstance(DatasetFramework.class), (CConfiguration) injector.getInstance(CConfiguration.class), SparkProgramRunnerTest.class.getClassLoader(), (MetricsCollector) null, (MetricsCollector) null);
        txService.startAndWait();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        txService.stopAndWait();
    }

    @After
    public void after() throws Exception {
        Iterator it = dsFramework.getInstances().iterator();
        while (it.hasNext()) {
            dsFramework.deleteInstance(((DatasetSpecification) it.next()).getName());
        }
    }

    @Test
    public void testSparkWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(SparkAppUsingObjectStore.class, TEMP_FOLDER_SUPPLIER);
        prepareInputData();
        runProgram(deployApplicationWithManager, SparkAppUsingObjectStore.CharCountSpecification.class);
        checkOutputData();
    }

    @Test
    public void testScalaSparkWithObjectStore() throws Exception {
        ApplicationWithPrograms deployApplicationWithManager = AppFabricTestHelper.deployApplicationWithManager(ScalaSparkAppUsingObjectStore.class, TEMP_FOLDER_SUPPLIER);
        prepareInputData();
        runProgram(deployApplicationWithManager, ScalaSparkAppUsingObjectStore.CharCountSpecification.class);
        checkOutputData();
    }

    private void prepareInputData() throws TransactionFailureException, InterruptedException {
        final ObjectStore dataset = datasetInstantiator.getDataset("keys");
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.2
            public void apply() {
                dataset.write(Bytes.toBytes("persisted data"), "persisted data");
                dataset.write(Bytes.toBytes("distributed systems"), "distributed systems");
            }
        });
    }

    private void checkOutputData() throws TransactionFailureException, InterruptedException {
        final KeyValueTable dataset = datasetInstantiator.getDataset("count");
        txExecutorFactory.createExecutor(datasetInstantiator.getTransactionAware()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.3
            public void apply() {
                Assert.assertTrue(dataset.read(Bytes.toBytes("persisted data")) != null);
                Assert.assertEquals(Bytes.toInt(r0), "persisted data".length());
                Assert.assertTrue(dataset.read(Bytes.toBytes("distributed systems")) != null);
                Assert.assertEquals(Bytes.toInt(r0), "distributed systems".length());
            }
        });
    }

    private void runProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws Exception {
        waitForCompletion(submit(applicationWithPrograms, cls));
    }

    private void waitForCompletion(ProgramController programController) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        programController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkProgramRunnerTest.4
            public void init(ProgramController.State state) {
                if (state == ProgramController.State.STOPPED || state == ProgramController.State.ERROR) {
                    countDownLatch.countDown();
                }
            }

            public void stopped() {
                countDownLatch.countDown();
            }

            public void error(Throwable th) {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        countDownLatch.await(10L, TimeUnit.MINUTES);
    }

    private ProgramController submit(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws ClassNotFoundException {
        ProgramRunnerFactory programRunnerFactory = (ProgramRunnerFactory) injector.getInstance(ProgramRunnerFactory.class);
        Program program = getProgram(applicationWithPrograms, cls);
        return programRunnerFactory.create(ProgramRunnerFactory.Type.valueOf(program.getType().name())).run(program, new SimpleProgramOptions(program.getName(), new BasicArguments(), new BasicArguments(Maps.newHashMap())));
    }

    private Program getProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws ClassNotFoundException {
        for (Program program : applicationWithPrograms.getPrograms()) {
            if (cls.getCanonicalName().equals(program.getMainClass().getCanonicalName())) {
                return program;
            }
        }
        return null;
    }
}
