package co.cask.cdap.spark.stream;

import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.DataSetManager;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.XSlowTests;
import co.cask.cdap.test.app.WorkflowAppWithLocalDatasets;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/spark/stream/SparkStreamIntegrationTestRun.class */
public class SparkStreamIntegrationTestRun extends TestFrameworkTestBase {
    @Test
    public void testSparkWithStream() throws Exception {
        ApplicationManager deployApplication = deployApplication(TestSparkStreamIntegrationApp.class, new File[0]);
        StreamManager streamManager = getStreamManager("testStream");
        for (int i = 0; i < 50; i++) {
            streamManager.send(String.valueOf(i));
        }
        deployApplication.getSparkManager("SparkStreamProgram").start().waitForRun(ProgramRunStatus.COMPLETED, 120L, TimeUnit.SECONDS);
        verifyDatasetResult(getDataset(WorkflowAppWithLocalDatasets.RESULT_DATASET));
    }

    @Test
    public void testSparkCrossNS() throws Exception {
        NamespaceMeta build = new NamespaceMeta.Builder().setName("streamNS").build();
        NamespaceMeta build2 = new NamespaceMeta.Builder().setName("crossNSDatasetAppNS").build();
        NamespaceMeta build3 = new NamespaceMeta.Builder().setName("outputDatasetNS").build();
        getNamespaceAdmin().create(build);
        getNamespaceAdmin().create(build2);
        getNamespaceAdmin().create(build3);
        addDatasetInstance(build3.getNamespaceId().dataset("finalDataset"), "keyValueTable");
        StreamManager streamManager = getStreamManager(build.getNamespaceId().stream("testStream"));
        streamManager.createStream();
        for (int i = 0; i < 50; i++) {
            streamManager.send(String.valueOf(i));
        }
        deployApplication(TestSparkStreamIntegrationApp.class, new File[0]).getSparkManager("SparkStreamProgram").start(ImmutableMap.of("stream.namespace", build.getNamespaceId().getNamespace(), "stream.name", "testStream")).waitForRun(ProgramRunStatus.COMPLETED, 120L, TimeUnit.SECONDS);
        verifyDatasetResult(getDataset(WorkflowAppWithLocalDatasets.RESULT_DATASET));
        deployApplication(build2.getNamespaceId(), TestSparkCrossNSDatasetApp.class, new File[0]).getSparkManager("SparkCrossNSDatasetProgram").start(ImmutableMap.of("input.dataset.namespace", NamespaceId.DEFAULT.getNamespace(), "input.dataset.name", WorkflowAppWithLocalDatasets.RESULT_DATASET, "output.dataset.namespace", build3.getNamespaceId().getNamespace(), "output.dataset.name", "finalDataset")).waitForRun(ProgramRunStatus.COMPLETED, 120L, TimeUnit.SECONDS);
        verifyDatasetResult(getDataset(build3.getNamespaceId().dataset("finalDataset")));
    }

    private void verifyDatasetResult(DataSetManager<KeyValueTable> dataSetManager) {
        KeyValueTable keyValueTable = (KeyValueTable) dataSetManager.get();
        for (int i = 0; i < 50; i++) {
            byte[] bytes = String.valueOf(i).getBytes(Charsets.UTF_8);
            Assert.assertArrayEquals(bytes, keyValueTable.read(bytes));
        }
    }
}
