package co.cask.cdap.partitioned;

import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.partitioned.AppWithPartitionConsumers;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.ProgramManager;
import co.cask.cdap.test.ServiceManager;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/partitioned/PartitionConsumingTestRun.class */
public class PartitionConsumingTestRun extends TestFrameworkTestBase {
    private static final String LINE1 = "a b a";
    private static final String LINE2 = "b a b";
    private static final String LINE3 = "c c c";

    @Test
    public void testMapReduceConsumer() throws Exception {
        testWordCountOnFileSet(new Function<ApplicationManager, ProgramManager>() { // from class: co.cask.cdap.partitioned.PartitionConsumingTestRun.1
            public ProgramManager apply(ApplicationManager applicationManager) {
                return applicationManager.getMapReduceManager(AppWithPartitionConsumers.WordCountMapReduce.NAME).start();
            }
        }, true);
        ImmutableMap of = ImmutableMap.of("ns", "default", "app", "AppWithPartitionConsumers", "mr", AppWithPartitionConsumers.WordCountMapReduce.NAME, "mrt", "r");
        long totalMetric = getMetricsManager().getTotalMetric(of, "system.process.entries.in");
        long totalMetric2 = getMetricsManager().getTotalMetric(of, "system.process.entries.out");
        Assert.assertEquals(9L, totalMetric);
        Assert.assertEquals(10L, totalMetric2);
    }

    @Test
    public void testWorkerConsumer() throws Exception {
        testWordCountOnFileSet(new Function<ApplicationManager, ProgramManager>() { // from class: co.cask.cdap.partitioned.PartitionConsumingTestRun.2
            public ProgramManager apply(ApplicationManager applicationManager) {
                return applicationManager.getWorkerManager(AppWithPartitionConsumers.WordCountWorker.NAME).start();
            }
        }, false);
    }

    private void testWordCountOnFileSet(Function<ApplicationManager, ProgramManager> function, boolean z) throws Exception {
        ApplicationManager deployApplication = deployApplication(AppWithPartitionConsumers.class, new File[0]);
        ServiceManager start = deployApplication.getServiceManager("DatasetService").start();
        start.waitForStatus(true);
        URL serviceURL = start.getServiceURL();
        createPartition(serviceURL, LINE1, "1");
        ((ProgramManager) function.apply(deployApplication)).waitForRun(ProgramRunStatus.COMPLETED, 5L, TimeUnit.MINUTES);
        Assert.assertEquals(new Long(2L), getCount(serviceURL, "a"));
        Assert.assertEquals(new Long(1L), getCount(serviceURL, "b"));
        Assert.assertEquals(new Long(0L), getCount(serviceURL, "c"));
        createPartition(serviceURL, LINE2, "2");
        createPartition(serviceURL, LINE3, "3");
        ((ProgramManager) function.apply(deployApplication)).waitForRuns(ProgramRunStatus.COMPLETED, 2, 5L, TimeUnit.MINUTES);
        Assert.assertEquals(new Long(3L), getCount(serviceURL, "a"));
        Assert.assertEquals(new Long(3L), getCount(serviceURL, "b"));
        Assert.assertEquals(new Long(3L), getCount(serviceURL, "c"));
        ((ProgramManager) function.apply(deployApplication)).waitForRuns(ProgramRunStatus.COMPLETED, 3, 5L, TimeUnit.MINUTES);
        Assert.assertEquals(new Long(3L), getCount(serviceURL, "a"));
        Assert.assertEquals(new Long(3L), getCount(serviceURL, "b"));
        Assert.assertEquals(new Long(3L), getCount(serviceURL, "c"));
        Assert.assertEquals(z ? 3L : 2L, ((PartitionedFileSet) getDataset("outputLines").get()).getPartitions(PartitionFilter.ALWAYS_MATCH).size());
        ArrayList newArrayList = Lists.newArrayList(new String[]{"1", "1", "2", "2", "3"});
        List<String> dataFromExplore = getDataFromExplore("outputLines");
        Collections.sort(dataFromExplore);
        Assert.assertEquals(newArrayList, dataFromExplore);
    }

    private List<String> getDataFromExplore(String str) throws Exception {
        Connection queryClient = getQueryClient();
        Throwable th = null;
        try {
            try {
                ResultSet executeQuery = queryClient.prepareStatement("SELECT * FROM dataset_" + str).executeQuery();
                ArrayList arrayList = new ArrayList();
                while (executeQuery.next()) {
                    arrayList.add(executeQuery.getString(1));
                }
                if (queryClient != null) {
                    if (0 != 0) {
                        try {
                            queryClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        queryClient.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (queryClient != null) {
                if (th != null) {
                    try {
                        queryClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    queryClient.close();
                }
            }
            throw th3;
        }
    }

    private void createPartition(URL url, String str, String str2) throws IOException {
        Assert.assertEquals(200L, HttpRequests.execute(HttpRequest.put(new URL(url, "lines?time=" + str2)).withBody(str).build()).getResponseCode());
    }

    private Long getCount(URL url, String str) throws IOException {
        HttpResponse execute = HttpRequests.execute(HttpRequest.get(new URL(url, "counts?word=" + str)).build());
        Assert.assertEquals(200L, execute.getResponseCode());
        return Long.valueOf(execute.getResponseBodyAsString());
    }
}
