package co.cask.cdap.test.app;

import co.cask.cdap.AppUsingNamespace;
import co.cask.cdap.ConfigTestApp;
import co.cask.cdap.api.app.Application;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.dataset.table.Get;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.metrics.RuntimeMetrics;
import co.cask.cdap.api.schedule.ScheduleSpecification;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.app.runtime.schedule.Scheduler;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.RunRecord;
import co.cask.cdap.proto.WorkflowNodeStateDetail;
import co.cask.cdap.proto.WorkflowTokenDetail;
import co.cask.cdap.proto.artifact.AppRequest;
import co.cask.cdap.proto.artifact.ArtifactSummary;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.DataSetManager;
import co.cask.cdap.test.FlowManager;
import co.cask.cdap.test.ServiceManager;
import co.cask.cdap.test.SlowTests;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.TestConfiguration;
import co.cask.cdap.test.WorkerManager;
import co.cask.cdap.test.WorkflowManager;
import co.cask.cdap.test.XSlowTests;
import co.cask.cdap.test.app.AppWithDuplicateData;
import co.cask.cdap.test.app.AppWithSchedule;
import co.cask.cdap.test.app.AppsWithDataset;
import co.cask.cdap.test.app.MyKeyValueTableDefinition;
import co.cask.cdap.test.app.WorkflowAppWithLocalDatasets;
import co.cask.cdap.test.artifacts.AppWithPlugin;
import co.cask.cdap.test.artifacts.plugins.ToStringPlugin;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({SlowTests.class})
/* loaded from: input_file:co/cask/cdap/test/app/TestFrameworkTestRun.class */
public class TestFrameworkTestRun extends TestFrameworkTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestFrameworkTestRun.class);

    @ClassRule
    public static final TestConfiguration CONFIG = new TestConfiguration(new Object[]{"explore.enabled", false});

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private final Id.Namespace testSpace = Id.Namespace.from("testspace");

    @Before
    public void setUp() throws Exception {
        getNamespaceAdmin().create(new NamespaceMeta.Builder().setName(this.testSpace).build());
    }

    @Test
    public void testInvalidAppWithDuplicateDatasets() throws Exception {
        Id.Artifact from = Id.Artifact.from(Id.Namespace.DEFAULT, "invalid-app", "1.0.0-SNAPSHOT");
        addAppArtifact(from, AppWithDuplicateData.class);
        addPluginArtifact(Id.Artifact.from(Id.Namespace.DEFAULT, "test-plugin", "1.0.0-SNAPSHOT"), from, ToStringPlugin.class, new Class[0]);
        Id.Application from2 = Id.Application.from(Id.Namespace.DEFAULT, "InvalidApp");
        int i = 4;
        while (true) {
            int i2 = i;
            if (i2 <= 0) {
                deployApplication(from2, new AppRequest(new ArtifactSummary(from.getName(), from.getVersion().getVersion()), new AppWithDuplicateData.ConfigClass(false, false, false)));
                return;
            } else {
                try {
                    deployApplication(from2, new AppRequest(new ArtifactSummary(from.getName(), from.getVersion().getVersion()), new AppWithDuplicateData.ConfigClass(i2 == 4, i2 == 2, i2 == 1)));
                    Assert.fail();
                } catch (Exception e) {
                }
                i = i2 / 2;
            }
        }
    }

    @Test
    public void testFlowRuntimeArguments() throws Exception {
        ApplicationManager deployApplication = deployApplication(FilterApp.class, new File[0]);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("threshold", "10");
        deployApplication.getFlowManager("FilterFlow").start(newHashMap);
        StreamManager streamManager = getStreamManager(DatasetWithMRApp.INPUT_KEY);
        streamManager.send("1");
        streamManager.send("11");
        ServiceManager start = deployApplication.getServiceManager("CountService").start();
        start.waitForStatus(true, 2, 1);
        Assert.assertEquals("1", new Gson().fromJson(callServiceGet(start.getServiceURL(), WorkflowAppWithLocalDatasets.RESULT_DATASET), String.class));
    }

    @Test
    public void testNewFlowRuntimeArguments() throws Exception {
        ApplicationManager deployApplication = deployApplication(FilterAppWithNewFlowAPI.class, new File[0]);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("threshold", "10");
        deployApplication.getFlowManager("FilterFlow").start(newHashMap);
        StreamManager streamManager = getStreamManager(DatasetWithMRApp.INPUT_KEY);
        streamManager.send("2");
        streamManager.send("21");
        ServiceManager start = deployApplication.getServiceManager("CountService").start();
        start.waitForStatus(true, 2, 1);
        Assert.assertEquals("1", new Gson().fromJson(callServiceGet(start.getServiceURL(), WorkflowAppWithLocalDatasets.RESULT_DATASET), String.class));
    }

    @Test
    public void testWorkerThrowingException() throws Exception {
        final WorkerManager workerManager = deployApplication(AppWithExceptionThrowingWorker.class, new File[0]).getWorkerManager(AppWithExceptionThrowingWorker.WORKER_NAME);
        testExceptionWorker(workerManager);
        Tasks.waitFor(3, new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workerManager.getHistory(ProgramRunStatus.FAILED).size());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        workerManager.start();
        workerManager.waitForFinish(3L, TimeUnit.SECONDS);
        Tasks.waitFor(1, new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workerManager.getHistory(ProgramRunStatus.COMPLETED).size());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        workerManager.setInstances(9);
        testExceptionWorker(workerManager);
        Tasks.waitFor(6, new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workerManager.getHistory(ProgramRunStatus.FAILED).size());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        Tasks.waitFor(7, new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workerManager.getHistory().size());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        workerManager.start();
        workerManager.waitForFinish(3L, TimeUnit.SECONDS);
        Tasks.waitFor(2, new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workerManager.getHistory(ProgramRunStatus.COMPLETED).size());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
    }

    private void testExceptionWorker(WorkerManager workerManager) throws Exception {
        workerManager.start(ImmutableMap.of(AppWithExceptionThrowingWorker.INITIALIZE, ""));
        workerManager.waitForFinish(3L, TimeUnit.SECONDS);
        workerManager.start(ImmutableMap.of("run", ""));
        workerManager.waitForFinish(3L, TimeUnit.SECONDS);
        workerManager.start(ImmutableMap.of("destroy", ""));
        workerManager.waitForFinish(3L, TimeUnit.SECONDS);
    }

    @Test
    public void testServiceManager() throws Exception {
        final ServiceManager serviceManager = deployApplication(FilterApp.class, new File[0]).getServiceManager("CountService");
        serviceManager.setInstances(2);
        Assert.assertEquals(0L, serviceManager.getProvisionedInstances());
        Assert.assertEquals(2L, serviceManager.getRequestedInstances());
        Assert.assertFalse(serviceManager.isRunning());
        Assert.assertEquals(0L, serviceManager.getHistory().size());
        serviceManager.start();
        Assert.assertTrue(serviceManager.isRunning());
        Assert.assertEquals(2L, serviceManager.getProvisionedInstances());
        Assert.assertEquals(0L, serviceManager.getHistory(ProgramRunStatus.KILLED).size());
        Tasks.waitFor(1, new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(serviceManager.getHistory(ProgramRunStatus.RUNNING).size());
            }
        }, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(ProgramRunStatus.RUNNING, ((RunRecord) serviceManager.getHistory(ProgramRunStatus.RUNNING).get(0)).getStatus());
        List history = serviceManager.getHistory(ProgramRunStatus.ALL);
        Assert.assertEquals(1L, history.size());
        Assert.assertEquals(ProgramRunStatus.RUNNING, ((RunRecord) history.get(0)).getStatus());
    }

    @Test
    public void testNamespaceAvailableAtRuntime() throws Exception {
        ServiceManager serviceManager = deployApplication(this.testSpace, AppUsingNamespace.class, new File[0]).getServiceManager("nsService");
        serviceManager.start();
        serviceManager.waitForStatus(true, 1, 10);
        Assert.assertEquals(this.testSpace.getId(), callServiceGet(serviceManager.getServiceURL(10L, TimeUnit.SECONDS), "ns"));
        serviceManager.stop();
        serviceManager.waitForStatus(false, 1, 10);
    }

    @Test
    public void testAppConfigWithNull() throws Exception {
        testAppConfig(ConfigTestApp.NAME, deployApplication(ConfigTestApp.class, new File[0]), null);
    }

    @Test
    public void testAppConfig() throws Exception {
        ConfigTestApp.ConfigClass configClass = new ConfigTestApp.ConfigClass("testStream", "testDataset");
        testAppConfig(ConfigTestApp.NAME, deployApplication(ConfigTestApp.class, configClass, new File[0]), configClass);
    }

    @Test
    public void testAppWithPlugin() throws Exception {
        Id.Artifact from = Id.Artifact.from(Id.Namespace.DEFAULT, "app-with-plugin", "1.0.0-SNAPSHOT");
        addAppArtifact(from, AppWithPlugin.class);
        addPluginArtifact(Id.Artifact.from(Id.Namespace.DEFAULT, "test-plugin", "1.0.0-SNAPSHOT"), from, ToStringPlugin.class, new Class[0]);
        ApplicationManager deployApplication = deployApplication(Id.Application.from(Id.Namespace.DEFAULT, "AppWithPlugin"), new AppRequest(new ArtifactSummary(from.getName(), from.getVersion().getVersion())));
        final WorkerManager workerManager = deployApplication.getWorkerManager(AppWithPlugin.WORKER);
        workerManager.start();
        workerManager.waitForStatus(false, 5, 1);
        Tasks.waitFor(false, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(workerManager.getHistory(ProgramRunStatus.COMPLETED).isEmpty());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        final ServiceManager serviceManager = deployApplication.getServiceManager(AppWithPlugin.SERVICE);
        serviceManager.start();
        serviceManager.waitForStatus(true, 1, 10);
        callServiceGet(serviceManager.getServiceURL(5L, TimeUnit.SECONDS), "dummy");
        serviceManager.stop();
        serviceManager.waitForStatus(false, 1, 10);
        Tasks.waitFor(false, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(serviceManager.getHistory(ProgramRunStatus.KILLED).isEmpty());
            }
        }, 5L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        WorkflowManager workflowManager = deployApplication.getWorkflowManager(AppWithPlugin.WORKFLOW);
        workflowManager.start();
        workflowManager.waitForFinish(10L, TimeUnit.MINUTES);
        Assert.assertNotEquals(ProgramRunStatus.FAILED, ((RunRecord) workflowManager.getHistory().get(0)).getStatus());
        Assert.assertEquals(AppWithPlugin.TEST, Bytes.toString(((KeyValueTable) getDataset(AppWithPlugin.WORKFLOW_TABLE).get()).read("val")));
        StreamManager streamManager = getStreamManager(AppWithPlugin.SPARK_STREAM);
        for (int i = 0; i < 5; i++) {
            streamManager.send("Message " + i);
        }
        deployApplication.getSparkManager(AppWithPlugin.SPARK).start().waitForFinish(2L, TimeUnit.MINUTES);
        Scanner scan = ((Table) getDataset(AppWithPlugin.SPARK_TABLE).get()).scan((byte[]) null, (byte[]) null);
        Throwable th = null;
        try {
            for (int i2 = 0; i2 < 5; i2++) {
                Row next = scan.next();
                Assert.assertNotNull(next);
                String str = "Message " + i2 + " " + AppWithPlugin.TEST;
                Assert.assertEquals(str, Bytes.toString(next.getRow()));
                Assert.assertEquals(str, Bytes.toString(next.get(str)));
            }
            Assert.assertNull(scan.next());
            if (scan != null) {
                if (0 == 0) {
                    scan.close();
                    return;
                }
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (scan != null) {
                if (0 != 0) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAppFromArtifact() throws Exception {
        Id.Artifact from = Id.Artifact.from(Id.Namespace.DEFAULT, "cfg-app", "1.0.0-SNAPSHOT");
        addAppArtifact(from, ConfigTestApp.class);
        Id.Application from2 = Id.Application.from(Id.Namespace.DEFAULT, "AppFromArtifact");
        AppRequest appRequest = new AppRequest(new ArtifactSummary(from.getName(), from.getVersion().getVersion()), new ConfigTestApp.ConfigClass("testStream", "testDataset"));
        testAppConfig(from2.getId(), deployApplication(from2, appRequest), (ConfigTestApp.ConfigClass) appRequest.getConfig());
    }

    private void testAppConfig(String str, ApplicationManager applicationManager, ConfigTestApp.ConfigClass configClass) throws Exception {
        String streamName = configClass == null ? "defaultStream" : configClass.getStreamName();
        String tableName = configClass == null ? "defaultTable" : configClass.getTableName();
        FlowManager start = applicationManager.getFlowManager("simpleFlow").start();
        StreamManager streamManager = getStreamManager(streamName);
        streamManager.send("abcd");
        streamManager.send("xyz");
        start.getFlowletMetrics("simpleFlowlet").waitForProcessed(2L, 1L, TimeUnit.MINUTES);
        start.stop();
        KeyValueTable keyValueTable = (KeyValueTable) getDataset(tableName).get();
        Assert.assertEquals("abcd", Bytes.toString(keyValueTable.read(str + ".abcd")));
        Assert.assertEquals("xyz", Bytes.toString(keyValueTable.read(str + ".xyz")));
    }

    @Test
    @Category({SlowTests.class})
    public void testMapperDatasetAccess() throws Exception {
        addDatasetInstance("keyValueTable", "table1").create();
        addDatasetInstance("keyValueTable", "table2").create();
        DataSetManager dataset = getDataset("table1");
        ((KeyValueTable) dataset.get()).write("hello", "world");
        dataset.flush();
        ApplicationManager deployApplication = deployApplication(DatasetWithMRApp.class, new File[0]);
        deployApplication.getMapReduceManager("copymr").start(ImmutableMap.of(DatasetWithMRApp.INPUT_KEY, "table1", "output", "table2")).waitForFinish(5L, TimeUnit.MINUTES);
        deployApplication.stopAll();
        verifyMapperJobOutput(DatasetWithMRApp.class, getDataset("table2"));
    }

    @Test
    @Category({SlowTests.class})
    public void testCrossNSMapperDatasetAccess() throws Exception {
        NamespaceMeta build = new NamespaceMeta.Builder().setName("inputNS").build();
        NamespaceMeta build2 = new NamespaceMeta.Builder().setName("outputNS").build();
        getNamespaceAdmin().create(build);
        getNamespaceAdmin().create(build2);
        addDatasetInstance(build.getNamespaceId().toId(), "keyValueTable", "table1").create();
        addDatasetInstance(build2.getNamespaceId().toId(), "keyValueTable", "table2").create();
        DataSetManager dataset = getDataset(build.getNamespaceId().toId(), "table1");
        ((KeyValueTable) dataset.get()).write("hello", "world");
        dataset.flush();
        ApplicationManager deployApplication = deployApplication(DatasetCrossNSAccessWithMAPApp.class, new File[0]);
        deployApplication.getMapReduceManager("copymr").start(ImmutableMap.of("input.dataset.namespace", build.getName(), "input.dataset.name", "table1", "output.dataset.namespace", build2.getName(), "output.dataset.name", "table2")).waitForFinish(5L, TimeUnit.MINUTES);
        deployApplication.stopAll();
        verifyMapperJobOutput(DatasetCrossNSAccessWithMAPApp.class, getDataset(build2.getNamespaceId().toId(), "table2"));
    }

    private void verifyMapperJobOutput(Class<?> cls, DataSetManager<KeyValueTable> dataSetManager) throws Exception {
        Assert.assertEquals("world", Bytes.toString(((KeyValueTable) dataSetManager.get()).read("hello")));
        Map transformEntries = Maps.transformEntries(Maps.uniqueIndex(getMetricsManager().query(new MetricDataQuery(0L, System.currentTimeMillis() / 1000, Integer.MAX_VALUE, ImmutableMap.of("system.dataset.store.reads", AggregationFunction.SUM, "system.dataset.store.writes", AggregationFunction.SUM), ImmutableMap.of("ns", DefaultId.NAMESPACE.getId(), "app", cls.getSimpleName(), "mr", "copymr"), ImmutableList.of())), new Function<MetricTimeSeries, String>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.9
            public String apply(MetricTimeSeries metricTimeSeries) {
                return metricTimeSeries.getMetricName();
            }
        }), new Maps.EntryTransformer<String, MetricTimeSeries, Long>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.10
            public Long transformEntry(String str, MetricTimeSeries metricTimeSeries) {
                Preconditions.checkArgument(metricTimeSeries.getTimeValues().size() == 1, "Expected one value for aggregated sum for metrics %s", new Object[]{str});
                return Long.valueOf(((TimeValue) metricTimeSeries.getTimeValues().get(0)).getValue());
            }
        });
        Assert.assertEquals(1L, transformEntries.get("system.dataset.store.reads"));
        Assert.assertEquals(1L, transformEntries.get("system.dataset.store.writes"));
    }

    @Test
    public void testWorkflowStatus() throws Exception {
        ApplicationManager deployApplication = deployApplication(WorkflowStatusTestApp.class, new File[0]);
        File file = new File(TMP_FOLDER.newFolder() + "/workflow.success");
        File file2 = new File(TMP_FOLDER.newFolder() + "/action.success");
        File file3 = new File(TMP_FOLDER.newFolder() + "/workflow.killed");
        File file4 = new File(TMP_FOLDER.newFolder() + "/first");
        File file5 = new File(TMP_FOLDER.newFolder() + "/first.done");
        WorkflowManager workflowManager = deployApplication.getWorkflowManager(WorkflowStatusTestApp.WORKFLOW_NAME);
        workflowManager.start(ImmutableMap.of("workflow.success.file", file.getAbsolutePath(), "action.success.file", file2.getAbsolutePath(), "throw.exception", AppWithServices.VALUE));
        workflowManager.waitForFinish(1L, TimeUnit.MINUTES);
        Assert.assertFalse(file.exists());
        Assert.assertFalse(file2.exists());
        workflowManager.start(ImmutableMap.of("workflow.success.file", file.getAbsolutePath(), "action.success.file", file2.getAbsolutePath()));
        workflowManager.waitForFinish(1L, TimeUnit.MINUTES);
        Assert.assertTrue(file.exists());
        Assert.assertTrue(file2.exists());
        workflowManager.start(ImmutableMap.of("workflow.killed.file", file3.getAbsolutePath(), "first.file", file4.getAbsolutePath(), "first.done.file", file5.getAbsolutePath(), "test.killed", AppWithServices.VALUE));
        verifyFileExists(Lists.newArrayList(new File[]{file4}));
        workflowManager.stop();
        workflowManager.waitForStatus(false);
        Assert.assertTrue(file3.exists());
    }

    @Test
    @Category({SlowTests.class})
    public void testCustomActionDatasetAccess() throws Exception {
        addDatasetInstance("keyValueTable", "customtable").create();
        addDatasetInstance("fileSet", "customfs").create();
        ApplicationManager deployApplication = deployApplication(DatasetWithCustomActionApp.class, new File[0]);
        deployApplication.getServiceManager("CustomService").start().waitForStatus(true);
        deployApplication.getWorkflowManager("CustomWorkflow").start().waitForFinish(2L, TimeUnit.MINUTES);
        deployApplication.stopAll();
        KeyValueTable keyValueTable = (KeyValueTable) getDataset("customtable").get();
        Assert.assertEquals("world", Bytes.toString(keyValueTable.read("hello")));
        Assert.assertEquals("service", Bytes.toString(keyValueTable.read("hi")));
        Assert.assertEquals("another.world", Bytes.toString(keyValueTable.read("another.hello")));
        InputStream inputStream = ((FileSet) getDataset("customfs").get()).getLocation("test").getInputStream();
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(42L, inputStream.read());
                if (inputStream != null) {
                    if (0 == 0) {
                        inputStream.close();
                        return;
                    }
                    try {
                        inputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (inputStream != null) {
                if (th != null) {
                    try {
                        inputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    inputStream.close();
                }
            }
            throw th4;
        }
    }

    @Test
    @Category({XSlowTests.class})
    public void testWorkflowLocalDatasets() throws Exception {
        ApplicationManager deployApplication = deployApplication(this.testSpace, WorkflowAppWithLocalDatasets.class, new File[0]);
        HashMap hashMap = new HashMap();
        verifyWorkflowRun(executeWorkflow(deployApplication, hashMap), false, false, "COMPLETED");
        hashMap.put("dataset.wordcount.keep.local", AppWithServices.VALUE);
        verifyWorkflowRun(executeWorkflow(deployApplication, hashMap), true, false, "COMPLETED");
        hashMap.clear();
        hashMap.put("dataset.*.keep.local", AppWithServices.VALUE);
        verifyWorkflowRun(executeWorkflow(deployApplication, hashMap), true, true, "COMPLETED");
        hashMap.clear();
        hashMap.put("dataset.*.keep.local", AppWithServices.VALUE);
        hashMap.put("destroy.throw.exception", AppWithServices.VALUE);
        verifyWorkflowRun(executeWorkflow(deployApplication, hashMap), true, true, "STARTED");
        List history = deployApplication.getWorkflowManager(WorkflowAppWithLocalDatasets.WORKFLOW_NAME).getHistory();
        Assert.assertEquals(4L, history.size());
        Iterator it = history.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(ProgramRunStatus.COMPLETED, ((RunRecord) it.next()).getStatus());
        }
    }

    private void verifyFileExists(final List<File> list) throws Exception {
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.11
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    if (!((File) it.next()).exists()) {
                        return false;
                    }
                }
                return true;
            }
        }, 100L, TimeUnit.SECONDS);
    }

    private void verifyWorkflowRun(String str, boolean z, boolean z2, String str2) throws Exception {
        DataSetManager dataset = getDataset(this.testSpace, "wordcount." + str);
        if (z) {
            Assert.assertNotNull(dataset.get());
        } else {
            Assert.assertNull(dataset.get());
        }
        DataSetManager dataset2 = getDataset(this.testSpace, "csvfileset." + str);
        if (z2) {
            Assert.assertNotNull(dataset2.get());
        } else {
            Assert.assertNull(dataset2.get());
        }
        Assert.assertEquals("6", Bytes.toString(((KeyValueTable) getDataset(this.testSpace, WorkflowAppWithLocalDatasets.RESULT_DATASET).get()).read("UniqueWordCount")));
        Assert.assertNull(getDataset(this.testSpace, "result." + str).get());
        Assert.assertEquals(str2, Bytes.toString(((KeyValueTable) getDataset(this.testSpace, WorkflowAppWithLocalDatasets.WORKFLOW_RUNS_DATASET).get()).read(str)));
    }

    private String executeWorkflow(ApplicationManager applicationManager, Map<String, String> map) throws Exception {
        WorkflowManager workflowManager = applicationManager.getWorkflowManager(WorkflowAppWithLocalDatasets.WORKFLOW_NAME);
        HashMap hashMap = new HashMap();
        File file = new File(TMP_FOLDER.newFolder(), "/wait.file");
        File file2 = new File(TMP_FOLDER.newFolder(), "/done.file");
        hashMap.put("input.path", DatasetWithMRApp.INPUT_KEY);
        hashMap.put("output.path", "output");
        hashMap.put("wait.file", file.getAbsolutePath());
        hashMap.put("done.file", file2.getAbsolutePath());
        hashMap.putAll(map);
        workflowManager.start(hashMap);
        while (!file.exists()) {
            TimeUnit.MILLISECONDS.sleep(50L);
        }
        List history = workflowManager.getHistory(ProgramRunStatus.RUNNING);
        Assert.assertEquals(1L, history.size());
        String pid = ((RunRecord) history.get(0)).getPid();
        Assert.assertEquals("2", Bytes.toString(((KeyValueTable) getDataset(this.testSpace, "wordcount." + pid).get()).read("text")));
        Assert.assertNotNull(getDataset(this.testSpace, "csvfileset." + pid).get());
        Assert.assertNull(getDataset(this.testSpace, WorkflowAppWithLocalDatasets.WORDCOUNT_DATASET).get());
        Assert.assertNull(getDataset(this.testSpace, WorkflowAppWithLocalDatasets.CSV_FILESET_DATASET).get());
        file2.createNewFile();
        workflowManager.waitForFinish(1L, TimeUnit.MINUTES);
        Map workflowNodeStates = workflowManager.getWorkflowNodeStates(pid);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("ns", this.testSpace.getId());
        hashMap2.put("app", applicationManager.getInfo().getName());
        hashMap2.put("wf", WorkflowAppWithLocalDatasets.WORKFLOW_NAME);
        hashMap2.put("run", pid);
        HashMap hashMap3 = new HashMap(hashMap2);
        hashMap3.put("nd", WorkflowAppWithLocalDatasets.LocalDatasetWriter.class.getSimpleName());
        Assert.assertEquals(2L, getMetricsManager().getTotalMetric(hashMap3, "user.num.lines"));
        HashMap hashMap4 = new HashMap(hashMap2);
        hashMap4.put("nd", "JavaSparkCSVToSpaceConverter");
        Assert.assertEquals(2L, getMetricsManager().getTotalMetric(hashMap4, "user.num.lines"));
        HashMap hashMap5 = new HashMap();
        hashMap5.put("ns", this.testSpace.getId());
        hashMap5.put("app", applicationManager.getInfo().getName());
        hashMap5.put("sp", "JavaSparkCSVToSpaceConverter");
        hashMap5.put("run", ((WorkflowNodeStateDetail) workflowNodeStates.get("JavaSparkCSVToSpaceConverter")).getRunId());
        Assert.assertEquals(2L, getMetricsManager().getTotalMetric(hashMap5, "user.num.lines"));
        HashMap hashMap6 = new HashMap();
        hashMap6.put("ns", this.testSpace.getId());
        hashMap6.put("app", applicationManager.getInfo().getName());
        Assert.assertEquals(4L, getMetricsManager().getTotalMetric(hashMap6, "user.num.lines"));
        HashMap hashMap7 = new HashMap(hashMap2);
        hashMap7.put("nd", "WordCount");
        Assert.assertEquals(7L, getMetricsManager().getTotalMetric(hashMap7, "user.num.words"));
        HashMap hashMap8 = new HashMap();
        hashMap8.put("ns", this.testSpace.getId());
        hashMap8.put("app", applicationManager.getInfo().getName());
        hashMap8.put("mr", "WordCount");
        hashMap8.put("run", ((WorkflowNodeStateDetail) workflowNodeStates.get("WordCount")).getRunId());
        Assert.assertEquals(7L, getMetricsManager().getTotalMetric(hashMap8, "user.num.words"));
        HashMap hashMap9 = new HashMap(hashMap2);
        hashMap9.put("nd", "readerAction");
        Assert.assertEquals(6L, getMetricsManager().getTotalMetric(hashMap9, "user.unique.words"));
        return pid;
    }

    @Test
    @Category({XSlowTests.class})
    public void testDeployWorkflowApp() throws Exception {
        final WorkflowManager workflowManager = deployApplication(this.testSpace, AppWithSchedule.class, new File[0]).getWorkflowManager("SampleWorkflow");
        List schedules = workflowManager.getSchedules();
        Assert.assertEquals(1L, schedules.size());
        String name = ((ScheduleSpecification) schedules.get(0)).getSchedule().getName();
        Assert.assertNotNull(name);
        Assert.assertFalse(name.isEmpty());
        workflowManager.getSchedule(name).resume();
        Assert.assertEquals("SCHEDULED", workflowManager.getSchedule(name).status(200));
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.12
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(!workflowManager.getHistory().isEmpty());
            }
        }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        workflowManager.getSchedule(name).suspend();
        waitForScheduleState(name, workflowManager, Scheduler.ScheduleState.SUSPENDED);
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.13
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                Iterator it = workflowManager.getHistory().iterator();
                while (it.hasNext()) {
                    if (((RunRecord) it.next()).getStatus() != ProgramRunStatus.COMPLETED) {
                        return false;
                    }
                }
                return true;
            }
        }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        List history = workflowManager.getHistory();
        int size = history.size();
        Assert.assertTrue(size > 0);
        TimeUnit.SECONDS.sleep(5L);
        final int size2 = workflowManager.getHistory().size();
        Assert.assertEquals(size, size2);
        workflowManager.getSchedule(name).resume();
        waitForScheduleState(name, workflowManager, Scheduler.ScheduleState.SCHEDULED);
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.14
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(workflowManager.getHistory().size() > size2);
            }
        }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("SCHEDULED", workflowManager.getSchedule(name).status(200));
        Assert.assertEquals("NOT_FOUND", workflowManager.getSchedule("doesnt exist").status(404));
        workflowManager.getSchedule(name).suspend();
        waitForScheduleState(name, workflowManager, Scheduler.ScheduleState.SUSPENDED);
        String pid = ((RunRecord) history.get(0)).getPid();
        Assert.assertEquals(0L, workflowManager.getToken(pid, WorkflowToken.Scope.SYSTEM, (String) null).getTokenData().size());
        Assert.assertEquals(2L, workflowManager.getToken(pid, (WorkflowToken.Scope) null, (String) null).getTokenData().size());
        waitForWorkflowStatus(workflowManager, ProgramRunStatus.COMPLETED);
        Assert.assertEquals(true, Boolean.valueOf(Boolean.parseBoolean((String) workflowManager.getTokenAtNode(pid, AppWithSchedule.DummyAction.class.getSimpleName(), WorkflowToken.Scope.USER, "finished").getTokenDataAtNode().get("finished"))));
        Assert.assertEquals(false, Boolean.valueOf(Boolean.parseBoolean(((WorkflowTokenDetail.NodeValueDetail) ((List) workflowManager.getToken(pid, (WorkflowToken.Scope) null, (String) null).getTokenData().get("running")).get(0)).getValue())));
    }

    @Test
    public void testBatchStreamUpload() throws Exception {
        StreamManager streamManager = getStreamManager("batchStream");
        streamManager.createStream();
        File newFile = TEMP_FOLDER.newFile("test-stream-data.txt");
        FileWriter fileWriter = new FileWriter(newFile);
        Throwable th = null;
        try {
            BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);
            Throwable th2 = null;
            try {
                try {
                    bufferedWriter.write(String.format("%s\n", "this,is,some"));
                    bufferedWriter.write(String.format("%s\n", "test,csv,data"));
                    bufferedWriter.write(String.format("%s\n", "that,can,be,used,to,test"));
                    bufferedWriter.write(String.format("%s\n", "batch,upload,capability"));
                    bufferedWriter.write(String.format("%s\n", "for,streams in testbase"));
                    if (bufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                bufferedWriter.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            bufferedWriter.close();
                        }
                    }
                    streamManager.send(newFile, "text/csv");
                    List events = streamManager.getEvents(0L, System.currentTimeMillis(), 100);
                    Assert.assertEquals(5L, events.size());
                    Assert.assertEquals("this,is,some", Bytes.toString((ByteBuffer) ((StreamEvent) events.get(0)).getBody()));
                    Assert.assertEquals("test,csv,data", Bytes.toString((ByteBuffer) ((StreamEvent) events.get(1)).getBody()));
                    Assert.assertEquals("that,can,be,used,to,test", Bytes.toString((ByteBuffer) ((StreamEvent) events.get(2)).getBody()));
                    Assert.assertEquals("batch,upload,capability", Bytes.toString((ByteBuffer) ((StreamEvent) events.get(3)).getBody()));
                    Assert.assertEquals("for,streams in testbase", Bytes.toString((ByteBuffer) ((StreamEvent) events.get(4)).getBody()));
                } finally {
                }
            } catch (Throwable th4) {
                if (bufferedWriter != null) {
                    if (th2 != null) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (fileWriter != null) {
                if (0 != 0) {
                    try {
                        fileWriter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    fileWriter.close();
                }
            }
        }
    }

    private void waitForWorkflowStatus(final WorkflowManager workflowManager, ProgramRunStatus programRunStatus) throws Exception {
        Tasks.waitFor(programRunStatus, new Callable<ProgramRunStatus>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.15
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public ProgramRunStatus call() throws Exception {
                List history = workflowManager.getHistory();
                return ((RunRecord) history.get(history.size() - 1)).getStatus();
            }
        }, 5L, TimeUnit.SECONDS, 30L, TimeUnit.MILLISECONDS);
    }

    private void waitForWorkflowRuns(final WorkflowManager workflowManager, int i) throws Exception {
        Tasks.waitFor(Integer.valueOf(i), new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.16
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workflowManager.getHistory().size());
            }
        }, 5L, TimeUnit.SECONDS, 30L, TimeUnit.MILLISECONDS);
    }

    private void waitForScheduleState(final String str, final WorkflowManager workflowManager, Scheduler.ScheduleState scheduleState) throws Exception {
        Tasks.waitFor(scheduleState, new Callable<Scheduler.ScheduleState>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.17
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Scheduler.ScheduleState call() throws Exception {
                return Scheduler.ScheduleState.valueOf(workflowManager.getSchedule(str).status(200));
            }
        }, 5L, TimeUnit.SECONDS, 30L, TimeUnit.MILLISECONDS);
    }

    @Test(timeout = 240000)
    @Category({XSlowTests.class})
    public void testMultiInput() throws Exception {
        ApplicationManager deployApplication = deployApplication(JoinMultiStreamApp.class, new File[0]);
        FlowManager start = deployApplication.getFlowManager("JoinMultiFlow").start();
        StreamManager streamManager = getStreamManager("s1");
        StreamManager streamManager2 = getStreamManager("s2");
        StreamManager streamManager3 = getStreamManager("s3");
        streamManager.send("testing 1");
        streamManager2.send("testing 2");
        streamManager3.send("testing 3");
        start.getFlowletMetrics("Terminal").waitForProcessed(3L, 60L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(1L);
        ServiceManager start2 = deployApplication.getServiceManager("QueryService").start();
        start2.waitForStatus(true, 2, 1);
        URL serviceURL = start2.getServiceURL();
        Gson gson = new Gson();
        Assert.assertEquals("testing 1", gson.fromJson(callServiceGet(serviceURL, "input1"), String.class));
        Assert.assertEquals("testing 2", gson.fromJson(callServiceGet(serviceURL, "input2"), String.class));
        Assert.assertEquals("testing 3", gson.fromJson(callServiceGet(serviceURL, "input3"), String.class));
    }

    @Test(timeout = 360000)
    @Category({XSlowTests.class})
    public void testApp() throws Exception {
        testApp(WordCountApp.class, "text");
    }

    @Test
    @Category({SlowTests.class})
    public void testGetServiceURL() throws Exception {
        ApplicationManager deployApplication = deployApplication(AppUsingGetServiceURL.class, new File[0]);
        ServiceManager start = deployApplication.getServiceManager(AppUsingGetServiceURL.CENTRAL_SERVICE).start();
        start.waitForStatus(true);
        WorkerManager start2 = deployApplication.getWorkerManager(AppUsingGetServiceURL.PINGING_WORKER).start();
        start2.waitForStatus(true);
        ServiceManager start3 = deployApplication.getServiceManager(AppUsingGetServiceURL.FORWARDING).start();
        Assert.assertEquals(AppUsingGetServiceURL.ANSWER, (String) new Gson().fromJson(callServiceGet(start3.getServiceURL(), "ping"), String.class));
        Assert.assertEquals(AppUsingGetServiceURL.ANSWER, (String) new Gson().fromJson(callServiceGet(start3.getServiceURL(), "read/Key"), String.class));
        start3.stop();
        try {
            start2.stop();
        } catch (Throwable th) {
            LOG.error("Got exception while stopping pinging worker", th);
        }
        start2.waitForStatus(false);
        start.stop();
        start.waitForStatus(false);
    }

    private void workerInstancesCheck(final WorkerManager workerManager, int i) throws Exception {
        Tasks.waitFor(Integer.valueOf(i), new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.18
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(workerManager.getInstances());
            }
        }, 15L, TimeUnit.SECONDS);
    }

    private void kvTableKeyCheck(final Id.Namespace namespace, final String str, final byte[] bArr) throws Exception {
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.19
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(((KeyValueTable) TestFrameworkTestRun.this.getDataset(namespace, str).get()).read(bArr) != null);
            }
        }, 15L, TimeUnit.SECONDS);
    }

    @Test
    @Category({SlowTests.class})
    public void testWorkerInstances() throws Exception {
        ApplicationManager deployApplication = deployApplication(this.testSpace, AppUsingGetServiceURL.class, new File[0]);
        WorkerManager workerManager = (WorkerManager) deployApplication.getWorkerManager(AppUsingGetServiceURL.PINGING_WORKER).start();
        workerManager.waitForStatus(true);
        workerInstancesCheck(workerManager, 5);
        workerManager.setInstances(10);
        workerInstancesCheck(workerManager, 10);
        workerManager.setInstances(2);
        workerInstancesCheck(workerManager, 2);
        workerManager.setInstances(2);
        workerInstancesCheck(workerManager, 2);
        WorkerManager workerManager2 = deployApplication.getWorkerManager(AppUsingGetServiceURL.LIFECYCLE_WORKER);
        workerManager2.setInstances(3);
        workerManager2.start().waitForStatus(true);
        workerInstancesCheck(workerManager2, 3);
        for (int i = 0; i < 3; i++) {
            kvTableKeyCheck(this.testSpace, AppUsingGetServiceURL.WORKER_INSTANCES_DATASET, Bytes.toBytes(String.format("init.%d", Integer.valueOf(i))));
        }
        workerManager2.setInstances(5);
        workerInstancesCheck(workerManager2, 5);
        for (int i2 = 0; i2 < 5; i2++) {
            kvTableKeyCheck(this.testSpace, AppUsingGetServiceURL.WORKER_INSTANCES_DATASET, Bytes.toBytes(String.format("init.%d", Integer.valueOf(i2))));
        }
        workerManager2.stop();
        workerManager2.waitForStatus(false);
        if (workerManager.isRunning()) {
            workerManager.stop();
        }
        workerManager.waitForStatus(false);
        workerInstancesCheck(workerManager2, 5);
        workerInstancesCheck(workerManager, 2);
        assertWorkerDatasetWrites(Bytes.toBytes(AppWithExceptionThrowingWorker.INITIALIZE), Bytes.stopKeyForPrefix(Bytes.toBytes("init.2")), 3, 3);
        assertWorkerDatasetWrites(Bytes.toBytes("init.3"), Bytes.stopKeyForPrefix(Bytes.toBytes(AppWithExceptionThrowingWorker.INITIALIZE)), 2, 5);
        byte[] bytes = Bytes.toBytes(AppWithWorker.STOP);
        assertWorkerDatasetWrites(bytes, Bytes.stopKeyForPrefix(bytes), 5, 5);
    }

    private void assertWorkerDatasetWrites(byte[] bArr, byte[] bArr2, int i, int i2) throws Exception {
        CloseableIterator scan = ((KeyValueTable) getDataset(this.testSpace, AppUsingGetServiceURL.WORKER_INSTANCES_DATASET).get()).scan(bArr, bArr2);
        Throwable th = null;
        try {
            ArrayList newArrayList = Lists.newArrayList(scan);
            Assert.assertEquals(i, newArrayList.size());
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(i2, Bytes.toInt((byte[]) ((KeyValue) it.next()).getValue()));
            }
            if (scan != null) {
                if (0 == 0) {
                    scan.close();
                    return;
                }
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (scan != null) {
                if (0 != 0) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalStateException.class)
    @Category({SlowTests.class})
    public void testServiceWithInvalidHandler() throws Exception {
        deployApplication(AppWithInvalidHandler.class, new File[0]);
    }

    @Test
    @Category({SlowTests.class})
    public void testAppWithWorker() throws Exception {
        ApplicationManager deployApplication = deployApplication(this.testSpace, AppWithWorker.class, new File[0]);
        LOG.info("Deployed.");
        WorkerManager start = deployApplication.getWorkerManager(AppWithWorker.WORKER).start();
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.20
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                KeyValueTable keyValueTable = (KeyValueTable) TestFrameworkTestRun.this.getDataset(TestFrameworkTestRun.this.testSpace, AppWithWorker.DATASET).get();
                return Boolean.valueOf(AppWithWorker.INITIALIZE.equals(Bytes.toString(keyValueTable.read(AppWithWorker.INITIALIZE))) && "run".equals(Bytes.toString(keyValueTable.read("run"))));
            }
        }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        start.stop();
        deployApplication.stopAll();
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.21
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(AppWithWorker.STOP.equals(Bytes.toString(((KeyValueTable) TestFrameworkTestRun.this.getDataset(TestFrameworkTestRun.this.testSpace, AppWithWorker.DATASET).get()).read(AppWithWorker.STOP))));
            }
        }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testWorkerStop() throws Exception {
        WorkerManager workerManager = deployApplication(NoOpWorkerApp.class, new File[0]).getWorkerManager("NoOpWorker");
        workerManager.start();
        workerManager.waitForStatus(false, 5, 1);
    }

    @Test
    @Category({SlowTests.class})
    public void testAppWithServices() throws Exception {
        ApplicationManager deployApplication = deployApplication(AppWithServices.class, new File[0]);
        LOG.info("Deployed.");
        ServiceManager start = deployApplication.getServiceManager(AppWithServices.SERVICE_NAME).start();
        start.waitForStatus(true);
        LOG.info("Service Started");
        URL serviceURL = start.getServiceURL(15L, TimeUnit.SECONDS);
        Assert.assertNotNull(serviceURL);
        Assert.assertEquals(200L, HttpRequests.execute(HttpRequest.get(new URL(serviceURL, "ping2")).build()).getResponseCode());
        HttpResponse execute = HttpRequests.execute(HttpRequest.get(new URL(serviceURL, "failure")).build());
        Assert.assertEquals(500L, execute.getResponseCode());
        Assert.assertTrue(execute.getResponseBodyAsString().contains("Exception"));
        Assert.assertEquals(200L, HttpRequests.execute(HttpRequest.get(new URL(serviceURL, "verifyClassLoader")).build()).getResponseCode());
        RuntimeMetrics metrics = start.getMetrics();
        metrics.waitForinput(3L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(3L, metrics.getInput());
        Assert.assertEquals(2L, metrics.getProcessed());
        Assert.assertEquals(1L, metrics.getException());
        RuntimeMetrics serviceHandlerMetrics = getMetricsManager().getServiceHandlerMetrics(Id.Namespace.DEFAULT.getId(), AppWithServices.APP_NAME, AppWithServices.SERVICE_NAME, AppWithServices.SERVICE_NAME);
        serviceHandlerMetrics.waitForinput(3L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(3L, serviceHandlerMetrics.getInput());
        Assert.assertEquals(2L, serviceHandlerMetrics.getProcessed());
        Assert.assertEquals(1L, serviceHandlerMetrics.getException());
        LOG.info("DatasetUpdateService Started");
        ImmutableMap of = ImmutableMap.of(AppWithServices.WRITE_VALUE_RUN_KEY, AppWithServices.DATASET_TEST_VALUE, AppWithServices.WRITE_VALUE_STOP_KEY, AppWithServices.DATASET_TEST_VALUE_STOP);
        ServiceManager start2 = deployApplication.getServiceManager(AppWithServices.DATASET_WORKER_SERVICE_NAME).start(of);
        WorkerManager start3 = deployApplication.getWorkerManager(AppWithServices.DATASET_UPDATE_WORKER).start(of);
        start2.waitForStatus(true);
        ServiceManager start4 = deployApplication.getServiceManager("NoOpService").start();
        start.waitForStatus(true, 2, 1);
        Assert.assertEquals(AppWithServices.DATASET_TEST_VALUE, (String) new Gson().fromJson(callServiceGet(start4.getServiceURL(), "ping/testKey"), String.class));
        RuntimeMetrics serviceHandlerMetrics2 = getMetricsManager().getServiceHandlerMetrics(Id.Namespace.DEFAULT.getId(), AppWithServices.APP_NAME, "NoOpService", "NoOpHandler");
        serviceHandlerMetrics2.waitForinput(1L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(1L, serviceHandlerMetrics2.getInput());
        Assert.assertEquals(1L, serviceHandlerMetrics2.getProcessed());
        Assert.assertEquals(0L, serviceHandlerMetrics2.getException());
        Assert.assertEquals(200L, HttpRequests.execute(HttpRequest.get(new URL(serviceURL, String.format("discover/%s/%s", AppWithServices.APP_NAME, AppWithServices.DATASET_WORKER_SERVICE_NAME))).build()).getResponseCode());
        start3.stop();
        start2.stop();
        start2.waitForStatus(false);
        LOG.info("DatasetUpdateService Stopped");
        start.stop();
        start.waitForStatus(false);
        LOG.info("ServerService Stopped");
        Assert.assertEquals(AppWithServices.DATASET_TEST_VALUE_STOP, (String) new Gson().fromJson(callServiceGet(start4.getServiceURL(), "ping/testKeyStop"), String.class));
        Assert.assertEquals(AppWithServices.DATASET_TEST_VALUE_STOP_2, (String) new Gson().fromJson(callServiceGet(start4.getServiceURL(), "ping/testKeyStop2"), String.class));
    }

    @Test
    public void testTransactionHandlerService() throws Exception {
        ApplicationManager deployApplication = deployApplication(this.testSpace, AppWithServices.class, new File[0]);
        LOG.info("Deployed.");
        ServiceManager start = deployApplication.getServiceManager(AppWithServices.TRANSACTIONS_SERVICE_NAME).start();
        start.waitForStatus(true);
        LOG.info("Service Started");
        final URL serviceURL = start.getServiceURL(15L, TimeUnit.SECONDS);
        Assert.assertNotNull(serviceURL);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future submit = newSingleThreadExecutor.submit(new Callable<Integer>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.22
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                try {
                    return Integer.valueOf(HttpRequests.execute(HttpRequest.get(new URL(String.format("%s/write/%s/%s/%d", serviceURL, AppWithServices.DATASET_TEST_KEY, AppWithServices.DATASET_TEST_VALUE, 10000))).build()).getResponseCode());
                } catch (Exception e) {
                    TestFrameworkTestRun.LOG.error("Request thread got exception.", e);
                    throw Throwables.propagate(e);
                }
            }
        });
        URL url = new URL(String.format("%s/read/%s", serviceURL, AppWithServices.DATASET_TEST_KEY));
        Assert.assertEquals(204L, HttpRequests.execute(HttpRequest.get(url).build()).getResponseCode());
        Assert.assertEquals(200L, ((Integer) submit.get()).intValue());
        HttpResponse execute = HttpRequests.execute(HttpRequest.get(url).build());
        Assert.assertEquals(200L, execute.getResponseCode());
        Assert.assertEquals(AppWithServices.DATASET_TEST_VALUE, new Gson().fromJson(execute.getResponseBodyAsString(), String.class));
        newSingleThreadExecutor.shutdown();
        start.stop();
        start.waitForStatus(false);
        Assert.assertEquals(AppWithServices.VALUE, Bytes.toString(((KeyValueTable) getDataset(this.testSpace, AppWithServices.TRANSACTIONS_DATASET_NAME).get()).read("destroy")));
    }

    /* JADX WARN: Type inference failed for: r0v19, types: [co.cask.cdap.test.app.TestFrameworkTestRun$23] */
    private void testApp(Class<? extends Application> cls, String str) throws Exception {
        ApplicationManager deployApplication = deployApplication(cls, new File[0]);
        FlowManager start = deployApplication.getFlowManager("WordCountFlow").start();
        StreamManager streamManager = getStreamManager(str);
        for (int i = 0; i < 100; i++) {
            streamManager.send(ImmutableMap.of("title", "title " + i), "testing message " + i);
        }
        RuntimeMetrics flowletMetrics = start.getFlowletMetrics("CountByField");
        flowletMetrics.waitForProcessed(500L, 10L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, flowletMetrics.getException());
        ServiceManager start2 = deployApplication.getServiceManager("WordFrequency").start();
        start2.waitForStatus(true, 2, 1);
        Map map = (Map) new Gson().fromJson(callServiceGet(start2.getServiceURL(), "wordfreq/" + str + ":testing"), new TypeToken<Map<String, Long>>() { // from class: co.cask.cdap.test.app.TestFrameworkTestRun.23
        }.getType());
        Assert.assertNotNull(map);
        Assert.assertEquals(100L, ((Long) map.get(str + ":testing")).longValue());
        RuntimeMetrics metrics = start2.getMetrics();
        metrics.waitForProcessed(1L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, metrics.getException());
        deployApplication.getMapReduceManager("countTotal").start().waitForFinish(1800L, TimeUnit.SECONDS);
        Assert.assertEquals(500L, Long.valueOf(callServiceGet(start2.getServiceURL(), "total")).longValue());
        deployApplication.getMapReduceManager("countFromStream").start().waitForFinish(120L, TimeUnit.SECONDS);
        Assert.assertEquals(300L, Long.valueOf(callServiceGet(start2.getServiceURL(), "stream_total")).longValue());
        Assert.assertEquals(100L, Long.valueOf(((MyKeyValueTableDefinition.KeyValueTable) getDataset("mydataset").get()).get("title:title")).longValue());
        Assert.assertEquals(100L, Long.valueOf(((MyKeyValueTableDefinition.KeyValueTable) getDataset("mydataset").get()).get("title:title")).longValue());
    }

    @Test
    @Category({SlowTests.class})
    public void testGenerator() throws InterruptedException, IOException, TimeoutException {
        FlowManager start = deployApplication(this.testSpace, GenSinkApp2.class, new File[0]).getFlowManager("GenSinkFlow").start();
        RuntimeMetrics flowletMetrics = start.getFlowletMetrics("GenFlowlet");
        RuntimeMetrics flowletMetrics2 = start.getFlowletMetrics("SinkFlowlet");
        RuntimeMetrics flowletMetrics3 = start.getFlowletMetrics("BatchSinkFlowlet");
        flowletMetrics2.waitFor("system.process.events.in", 198L, 5L, TimeUnit.SECONDS);
        flowletMetrics2.waitForProcessed(198L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, flowletMetrics2.getException());
        flowletMetrics3.waitFor("system.process.events.in", 99L, 5L, TimeUnit.SECONDS);
        flowletMetrics3.waitForProcessed(99L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, flowletMetrics3.getException());
        Assert.assertEquals(1L, flowletMetrics.getException());
    }

    @Test
    @Category({SlowTests.class})
    public void testDynamicBatchSize() throws Exception {
        ApplicationManager deployApplication = deployApplication(this.testSpace, GenSinkApp2.class, new File[0]);
        DataSetManager dataset = getDataset(this.testSpace, "table");
        FlowManager start = deployApplication.getFlowManager("GenSinkFlow").start(Collections.singletonMap("flowlet.BatchSinkFlowlet.batch.size", "1"));
        start.getFlowletMetrics("BatchSinkFlowlet").waitForProcessed(99L, 5L, TimeUnit.SECONDS);
        start.stop();
        start.waitForFinish(10L, TimeUnit.SECONDS);
        CloseableIterator scan = ((KeyValueTable) dataset.get()).scan((byte[]) null, (byte[]) null);
        Throwable th = null;
        while (scan.hasNext()) {
            try {
                try {
                    Assert.assertEquals(1L, Bytes.toInt((byte[]) ((KeyValue) scan.next()).getKey()));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th3;
            }
        }
        if (scan != null) {
            if (0 == 0) {
                scan.close();
                return;
            }
            try {
                scan.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    public void testAppRedeployKeepsData() throws Exception {
        deployApplication(this.testSpace, AppWithTable.class, new File[0]);
        DataSetManager dataset = getDataset(this.testSpace, "my_table");
        ((Table) dataset.get()).put(new Put("key1", "column1", "value1"));
        dataset.flush();
        Assert.assertEquals("value1", ((Table) getDataset(this.testSpace, "my_table").get()).get(new Get("key1", new String[]{"column1"})).getString("column1"));
        deployApplication(AppWithTable.class, new File[0]);
        DataSetManager dataset2 = getDataset(this.testSpace, "my_table");
        Assert.assertEquals("value1", ((Table) dataset2.get()).get(new Get("key1", new String[]{"column1"})).getString("column1"));
        ((Table) dataset.get()).put(new Put("key1", "column1", "value2"));
        dataset.flush();
        Assert.assertEquals("value1", ((Table) dataset2.get()).get(new Get("key1", new String[]{"column1"})).getString("column1"));
    }

    @Test(timeout = 60000)
    public void testFlowletMetricsReset() throws Exception {
        FlowManager start = deployApplication(DataSetInitApp.class, new File[0]).getFlowManager("DataSetFlow").start();
        RuntimeMetrics flowletMetrics = start.getFlowletMetrics("Consumer");
        flowletMetrics.waitForProcessed(1L, 5L, TimeUnit.SECONDS);
        start.stop();
        Assert.assertEquals(1L, flowletMetrics.getProcessed());
        getMetricsManager().resetAll();
        Assert.assertEquals(0L, flowletMetrics.getProcessed());
    }

    @Test(timeout = 60000)
    public void testFlowletInitAndSetInstances() throws Exception {
        FlowManager start = deployApplication(this.testSpace, DataSetInitApp.class, new File[0]).getFlowManager("DataSetFlow").start();
        RuntimeMetrics flowletMetrics = start.getFlowletMetrics("Consumer");
        flowletMetrics.waitForProcessed(1L, 5L, TimeUnit.SECONDS);
        Assert.assertEquals(1L, start.getFlowletInstances("Generator"));
        start.setFlowletInstances("Generator", 3);
        Assert.assertEquals(3L, start.getFlowletInstances("Generator"));
        flowletMetrics.waitForProcessed(3L, 10L, TimeUnit.SECONDS);
        start.setFlowletInstances("Generator", 1);
        Assert.assertEquals(1L, start.getFlowletInstances("Generator"));
        TimeUnit.SECONDS.sleep(3L);
        Assert.assertEquals(3L, flowletMetrics.getProcessed());
        start.setFlowletInstances("Generator", 2);
        Assert.assertEquals(2L, start.getFlowletInstances("Generator"));
        flowletMetrics.waitForProcessed(4L, 10L, TimeUnit.SECONDS);
        start.stop();
        DataSetManager dataset = getDataset(this.testSpace, "conf");
        Assert.assertEquals("generator", ((Table) dataset.get()).get(new Get("key", new String[]{"column"})).getString("column"));
        dataset.flush();
    }

    @Test(timeout = 60000)
    public void testAppWithAutoDeployDatasetModule() throws Exception {
        testAppWithDataset(AppsWithDataset.AppWithAutoDeploy.class, "MyService");
    }

    @Test(timeout = 60000)
    public void testAppWithAutoDeployDataset() throws Exception {
        deployDatasetModule("my-kv", AppsWithDataset.KeyValueTableDefinition.Module.class);
        testAppWithDataset(AppsWithDataset.AppWithAutoDeploy.class, "MyService");
    }

    @Test(timeout = 60000)
    public void testAppWithAutoCreateDataset() throws Exception {
        deployDatasetModule("my-kv", AppsWithDataset.KeyValueTableDefinition.Module.class);
        testAppWithDataset(AppsWithDataset.AppWithAutoCreate.class, "MyService");
    }

    @Test(timeout = 60000)
    public void testAppWithExistingDataset() throws Exception {
        deployDatasetModule("my-kv", AppsWithDataset.KeyValueTableDefinition.Module.class);
        addDatasetInstance("myKeyValueTable", "myTable", DatasetProperties.EMPTY).create();
        testAppWithDataset(AppsWithDataset.AppWithExisting.class, "MyService");
    }

    @Test(timeout = 60000)
    public void testAppWithExistingDatasetInjectedByAnnotation() throws Exception {
        deployDatasetModule("my-kv", AppsWithDataset.KeyValueTableDefinition.Module.class);
        addDatasetInstance("myKeyValueTable", "myTable", DatasetProperties.EMPTY).create();
        testAppWithDataset(AppsWithDataset.AppUsesAnnotation.class, "MyServiceWithUseDataSetAnnotation");
    }

    @Test(timeout = 60000)
    public void testDatasetWithoutApp() throws Exception {
        deployApplication(this.testSpace, DummyApp.class, new File[0]);
        deployDatasetModule(this.testSpace, "my-kv", AppsWithDataset.KeyValueTableDefinition.Module.class);
        addDatasetInstance(this.testSpace, "myKeyValueTable", "myTable", DatasetProperties.EMPTY).create();
        DataSetManager dataset = getDataset(this.testSpace, "myTable");
        ((AppsWithDataset.KeyValueTableDefinition.KeyValueTable) dataset.get()).put("test", "hello");
        dataset.flush();
        Assert.assertEquals("hello", ((AppsWithDataset.KeyValueTableDefinition.KeyValueTable) dataset.get()).get("test"));
    }

    @Test(timeout = 60000)
    public void testAppWithAutoDeployDatasetType() throws Exception {
        testAppWithDataset(AppsWithDataset.AppWithAutoDeployType.class, "MyService");
    }

    @Test(timeout = 60000)
    public void testAppWithAutoDeployDatasetTypeShortcut() throws Exception {
        testAppWithDataset(AppsWithDataset.AppWithAutoDeployTypeShortcut.class, "MyService");
    }

    private void testAppWithDataset(Class<? extends Application> cls, String str) throws Exception {
        ServiceManager start = deployApplication(cls, new File[0]).getServiceManager(str).start();
        start.waitForStatus(true, 2, 1);
        callServicePut(start.getServiceURL(), "key1", "value1");
        Assert.assertEquals("value1", new Gson().fromJson(callServiceGet(start.getServiceURL(), "key1"), String.class));
    }

    @Test
    @Category({XSlowTests.class})
    public void testByteCodeClassLoader() throws Exception {
        ApplicationManager deployApplication = deployApplication(this.testSpace, ClassLoaderTestApp.class, new File[0]);
        FlowManager start = deployApplication.getFlowManager("BasicFlow").start();
        start.getFlowletMetrics("Sink").waitForProcessed(10L, 5000L, TimeUnit.MILLISECONDS);
        start.stop();
        ServiceManager start2 = deployApplication.getServiceManager("RecordQuery").start();
        URL serviceURL = start2.getServiceURL(15L, TimeUnit.SECONDS);
        Assert.assertNotNull(serviceURL);
        HttpResponse execute = HttpRequests.execute(HttpRequest.get(new URL(serviceURL, "query?type=public")).build());
        Assert.assertEquals(200L, execute.getResponseCode());
        long parseLong = Long.parseLong(execute.getResponseBodyAsString());
        start2.stop();
        Assert.assertTrue(parseLong == Bytes.toLong(((KeyValueTable) getDataset(this.testSpace, "records").get()).read("PUBLIC")));
    }

    private String callServiceGet(URL url, String str) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(((HttpURLConnection) new URL(url.toString() + str).openConnection()).getInputStream(), Charsets.UTF_8));
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(200L, r0.getResponseCode());
                String readLine = bufferedReader.readLine();
                if (bufferedReader != null) {
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                return readLine;
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedReader != null) {
                if (th != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th3;
        }
    }

    private String callServicePut(URL url, String str, String str2) throws IOException {
        BufferedReader bufferedReader;
        Throwable th;
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(url.toString() + str).openConnection();
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setRequestMethod("PUT");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(httpURLConnection.getOutputStream());
        Throwable th2 = null;
        try {
            try {
                outputStreamWriter.write(str2);
                if (outputStreamWriter != null) {
                    if (0 != 0) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        outputStreamWriter.close();
                    }
                }
                bufferedReader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), Charsets.UTF_8));
                th = null;
            } finally {
            }
            try {
                try {
                    Assert.assertEquals(200L, httpURLConnection.getResponseCode());
                    String readLine = bufferedReader.readLine();
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    return readLine;
                } finally {
                }
            } catch (Throwable th5) {
                if (bufferedReader != null) {
                    if (th != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (outputStreamWriter != null) {
                if (th2 != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th8) {
                        th2.addSuppressed(th8);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th7;
        }
    }
}
