package co.cask.cdap.spark;

import co.cask.cdap.api.app.Application;
import co.cask.cdap.api.artifact.ArtifactSummary;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.common.test.AppJarHelper;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.artifact.AppRequest;
import co.cask.cdap.proto.id.ArtifactId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.spark.app.CharCountProgram;
import co.cask.cdap.spark.app.ScalaCharCountProgram;
import co.cask.cdap.spark.app.ScalaCrossNSProgram;
import co.cask.cdap.spark.app.ScalaSparkServiceProgram;
import co.cask.cdap.spark.app.Spark2TestApp;
import co.cask.cdap.spark.app.SparkAppUsingLocalFiles;
import co.cask.cdap.spark.app.SparkAppUsingObjectStore;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.DataSetManager;
import co.cask.cdap.test.SparkManager;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.TestBaseWithSpark2;
import co.cask.cdap.test.TestConfiguration;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.twill.filesystem.LocalLocationFactory;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/spark/Spark2Test.class */
public class Spark2Test extends TestBaseWithSpark2 {
    private static final String TEST_STRING_1 = "persisted data";
    private static final String TEST_STRING_2 = "distributed systems";

    @ClassRule
    public static final TestConfiguration CONFIG = new TestConfiguration(new Object[]{"explore.enabled", false});
    private static final Map<Class<? extends Application>, File> ARTIFACTS = new IdentityHashMap();

    @BeforeClass
    public static void init() throws IOException {
        ARTIFACTS.put(SparkAppUsingObjectStore.class, createArtifactJar(SparkAppUsingObjectStore.class));
        ARTIFACTS.put(SparkAppUsingLocalFiles.class, createArtifactJar(SparkAppUsingLocalFiles.class));
        ARTIFACTS.put(Spark2TestApp.class, createArtifactJar(Spark2TestApp.class));
    }

    @Test
    public void testSpark2Service() throws Exception {
        URL serviceURL = deploy(NamespaceId.DEFAULT, Spark2TestApp.class).getSparkManager(ScalaSparkServiceProgram.class.getSimpleName()).start().getServiceURL(5L, TimeUnit.MINUTES);
        Assert.assertNotNull(serviceURL);
        HttpURLConnection httpURLConnection = (HttpURLConnection) serviceURL.toURI().resolve("sum?n=" + Joiner.on("&n=").join(1, 2, new Object[]{3, 4, 5, 6, 7, 8, 9, 10})).toURL().openConnection();
        Assert.assertEquals(200L, httpURLConnection.getResponseCode());
        InputStream inputStream = httpURLConnection.getInputStream();
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(55L, Integer.parseInt(new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8)));
                if (inputStream != null) {
                    if (0 == 0) {
                        inputStream.close();
                        return;
                    }
                    try {
                        inputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (inputStream != null) {
                if (th != null) {
                    try {
                        inputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    inputStream.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSparkWithObjectStore() throws Exception {
        ApplicationManager deploy = deploy(NamespaceId.DEFAULT, SparkAppUsingObjectStore.class);
        prepareInputData(getDataset("keys"));
        SparkManager start = deploy.getSparkManager(CharCountProgram.class.getSimpleName()).start();
        start.waitForRun(ProgramRunStatus.RUNNING, 10L, TimeUnit.SECONDS);
        start.waitForStopped(60L, TimeUnit.SECONDS);
        checkOutputData(getDataset("count"));
        Tasks.waitFor(4L, new Callable<Long>() { // from class: co.cask.cdap.spark.Spark2Test.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                Collection query = Spark2Test.this.getMetricsManager().query(new MetricDataQuery(0L, System.currentTimeMillis() / 1000, Integer.MAX_VALUE, "system.dataset.store.ops", AggregationFunction.SUM, ImmutableMap.of("ns", DefaultId.NAMESPACE.getNamespace(), "app", SparkAppUsingObjectStore.class.getSimpleName(), "sp", CharCountProgram.class.getSimpleName(), "ds", "totals"), Collections.emptyList()));
                if (query.isEmpty()) {
                    return 0L;
                }
                Assert.assertEquals(1L, query.size());
                MetricTimeSeries metricTimeSeries = (MetricTimeSeries) query.iterator().next();
                Assert.assertEquals(1L, metricTimeSeries.getTimeValues().size());
                return Long.valueOf(((TimeValue) metricTimeSeries.getTimeValues().get(0)).getValue());
            }
        }, 10L, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testScalaSparkWithObjectStore() throws Exception {
        ApplicationManager deploy = deploy(NamespaceId.DEFAULT, SparkAppUsingObjectStore.class);
        prepareInputData(getDataset("keys"));
        SparkManager start = deploy.getSparkManager(ScalaCharCountProgram.class.getSimpleName()).start();
        start.waitForRun(ProgramRunStatus.RUNNING, 10L, TimeUnit.SECONDS);
        start.waitForStopped(60L, TimeUnit.SECONDS);
        checkOutputData(getDataset("count"));
    }

    @Test
    public void testScalaSparkCrossNSStream() throws Exception {
        NamespaceMeta build = new NamespaceMeta.Builder().setName("streamSpaceForSpark").build();
        getNamespaceAdmin().create(build);
        StreamManager streamManager = getStreamManager(build.getNamespaceId().stream("testStream"));
        NamespaceMeta build2 = new NamespaceMeta.Builder().setName("crossNSDataset").build();
        getNamespaceAdmin().create(build2);
        addDatasetInstance(build2.getNamespaceId().dataset("count"), "keyValueTable");
        streamManager.createStream();
        for (int i = 0; i < 50; i++) {
            streamManager.send(String.valueOf(i));
        }
        SparkManager start = deploy(NamespaceId.DEFAULT, SparkAppUsingObjectStore.class).getSparkManager(ScalaCrossNSProgram.class.getSimpleName()).start(ImmutableMap.of(ScalaCrossNSProgram.STREAM_NAMESPACE(), build.getNamespaceId().getNamespace(), ScalaCrossNSProgram.DATASET_NAMESPACE(), build2.getNamespaceId().getNamespace(), ScalaCrossNSProgram.DATASET_NAME(), "count"));
        start.waitForRun(ProgramRunStatus.RUNNING, 10L, TimeUnit.SECONDS);
        start.waitForStopped(60L, TimeUnit.SECONDS);
        KeyValueTable keyValueTable = (KeyValueTable) getDataset(build2.getNamespaceId().dataset("count")).get();
        for (int i2 = 0; i2 < 50; i2++) {
            byte[] bytes = String.valueOf(i2).getBytes(Charsets.UTF_8);
            Assert.assertArrayEquals(bytes, keyValueTable.read(bytes));
        }
    }

    @Test
    public void testScalaSparkCrossNSDataset() throws Exception {
        NamespaceMeta build = new NamespaceMeta.Builder().setName("datasetSpaceForSpark").build();
        getNamespaceAdmin().create(build);
        deploy(build.getNamespaceId(), SparkAppUsingObjectStore.class);
        prepareInputData(getDataset(build.getNamespaceId().dataset("keys")));
        SparkManager start = deploy(NamespaceId.DEFAULT, SparkAppUsingObjectStore.class).getSparkManager(ScalaCharCountProgram.class.getSimpleName()).start(ImmutableMap.of(ScalaCharCountProgram.INPUT_DATASET_NAMESPACE(), build.getNamespaceId().getNamespace(), ScalaCharCountProgram.INPUT_DATASET_NAME(), "keys"));
        start.waitForRun(ProgramRunStatus.RUNNING, 10L, TimeUnit.SECONDS);
        start.waitForStopped(60L, TimeUnit.SECONDS);
        checkOutputData(getDataset("count"));
    }

    @Test
    public void testSparkWithLocalFiles() throws Exception {
        testSparkWithLocalFiles(SparkAppUsingLocalFiles.class, SparkAppUsingLocalFiles.JavaSparkUsingLocalFiles.class.getSimpleName(), "java");
        testSparkWithLocalFiles(SparkAppUsingLocalFiles.class, SparkAppUsingLocalFiles.ScalaSparkUsingLocalFiles.class.getSimpleName(), "scala");
    }

    private void prepareInputData(DataSetManager<ObjectStore<String>> dataSetManager) {
        ObjectStore objectStore = (ObjectStore) dataSetManager.get();
        objectStore.write(Bytes.toBytes(TEST_STRING_1), TEST_STRING_1);
        objectStore.write(Bytes.toBytes(TEST_STRING_2), TEST_STRING_2);
        dataSetManager.flush();
    }

    private void checkOutputData(DataSetManager<KeyValueTable> dataSetManager) {
        KeyValueTable keyValueTable = (KeyValueTable) dataSetManager.get();
        Assert.assertTrue(keyValueTable.read(Bytes.toBytes(TEST_STRING_1)) != null);
        Assert.assertEquals(Bytes.toInt(r0), TEST_STRING_1.length());
        Assert.assertTrue(keyValueTable.read(Bytes.toBytes(TEST_STRING_2)) != null);
        Assert.assertEquals(Bytes.toInt(r0), TEST_STRING_2.length());
    }

    private static File createArtifactJar(Class<? extends Application> cls) throws IOException {
        return new File(AppJarHelper.createDeploymentJar(new LocalLocationFactory(TMP_FOLDER.newFolder()), cls, new File[0]).toURI());
    }

    private ApplicationManager deploy(NamespaceId namespaceId, Class<? extends Application> cls) throws Exception {
        ArtifactId artifactId = new ArtifactId(namespaceId.getNamespace(), cls.getSimpleName(), "1.0-SNAPSHOT");
        addArtifact(artifactId, ARTIFACTS.get(cls));
        return deployApplication(namespaceId.app(cls.getSimpleName()), new AppRequest(new ArtifactSummary(artifactId.getArtifact(), artifactId.getVersion()), (Object) null));
    }

    private void testSparkWithLocalFiles(Class<? extends Application> cls, String str, String str2) throws Exception {
        SparkManager start = deploy(NamespaceId.DEFAULT, cls).getSparkManager(str).start(Collections.singletonMap(SparkAppUsingLocalFiles.LOCAL_FILE_RUNTIME_ARG, createLocalPropertiesFile(str2).toString()));
        start.waitForRun(ProgramRunStatus.RUNNING, 10L, TimeUnit.SECONDS);
        start.waitForStopped(120L, TimeUnit.SECONDS);
        DataSetManager dataset = getDataset(SparkAppUsingLocalFiles.OUTPUT_DATASET_NAME);
        KeyValueTable keyValueTable = (KeyValueTable) dataset.get();
        ImmutableMap of = ImmutableMap.of("a", "1", "b", "2", "c", "3");
        ArrayList arrayList = new ArrayList();
        CloseableIterator scan = keyValueTable.scan((byte[]) null, (byte[]) null);
        Throwable th = null;
        for (int i = 0; i < 3; i++) {
            try {
                try {
                    KeyValue keyValue = (KeyValue) scan.next();
                    Assert.assertEquals(of.get(Bytes.toString((byte[]) keyValue.getKey())), Bytes.toString((byte[]) keyValue.getValue()));
                    arrayList.add(keyValue.getKey());
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        Assert.assertFalse(scan.hasNext());
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        dataset.flush();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            keyValueTable.delete((byte[]) it.next());
        }
        dataset.flush();
    }

    private URI createLocalPropertiesFile(String str) throws IOException {
        File newFile = TMP_FOLDER.newFile(str + "-local.properties");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new FileOutputStream(newFile));
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write("a=1\n");
                outputStreamWriter.write("b = 2\n");
                outputStreamWriter.write("c= 3");
                if (outputStreamWriter != null) {
                    if (0 != 0) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        outputStreamWriter.close();
                    }
                }
                return newFile.toURI();
            } finally {
            }
        } catch (Throwable th3) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th3;
        }
    }
}
