package co.cask.cdap.spark;

import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.common.Scope;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetArguments;
import co.cask.cdap.api.dataset.lib.PartitionDetail;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionOutput;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.dataset.lib.TimePartitionDetail;
import co.cask.cdap.api.dataset.lib.TimePartitionOutput;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.spark.app.ScalaFileCountSparkProgram;
import co.cask.cdap.spark.app.SparkAppUsingFileSet;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.DataSetManager;
import co.cask.cdap.test.TestConfiguration;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.TransactionFailureException;
import org.apache.twill.filesystem.Location;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/spark/SparkFileSetTestRun.class */
public class SparkFileSetTestRun extends TestFrameworkTestBase {

    @ClassRule
    public static final TestConfiguration CONFIG = new TestConfiguration(new Object[]{"explore.enabled", false});
    private static File artifactJar;
    private ApplicationManager applicationManager;

    @BeforeClass
    public static void init() throws IOException {
        artifactJar = createArtifactJar(SparkAppUsingFileSet.class);
    }

    @Before
    public void deploy() throws Exception {
        this.applicationManager = deployWithArtifact(SparkAppUsingFileSet.class, artifactJar);
    }

    @Test
    public void testSparkWithFileSet() throws Exception {
        testSparkWithFileSet(this.applicationManager, FileCountSparkProgram.class.getSimpleName());
        testSparkWithFileSet(this.applicationManager, ScalaFileCountSparkProgram.class.getSimpleName());
    }

    @Test
    public void testSparkWithCustomFileSet() throws Exception {
        testSparkWithCustomFileSet(this.applicationManager, FileCountSparkProgram.class.getSimpleName());
        testSparkWithCustomFileSet(this.applicationManager, ScalaFileCountSparkProgram.class.getSimpleName());
    }

    @Test
    public void testSparkWithTimePartitionedFileSet() throws Exception {
        testSparkWithTimePartitionedFileSet(this.applicationManager, FileCountSparkProgram.class.getSimpleName());
        testSparkWithTimePartitionedFileSet(this.applicationManager, ScalaFileCountSparkProgram.class.getSimpleName());
    }

    @Test
    public void testSparkWithPartitionedFileSet() throws Exception {
        testSparkWithPartitionedFileSet(this.applicationManager, FileCountSparkProgram.class.getSimpleName());
        testSparkWithPartitionedFileSet(this.applicationManager, ScalaFileCountSparkProgram.class.getSimpleName());
    }

    private void testSparkWithFileSet(ApplicationManager applicationManager, String str) throws Exception {
        FileSet fileSet = (FileSet) getDataset("fs").get();
        prepareFileInput(fileSet.getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        FileSetArguments.setOutputPath(hashMap, "xx");
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "fs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "fs", hashMap2));
        hashMap3.put("input", "fs");
        hashMap3.put("output", "fs");
        applicationManager.getSparkManager(str).start(hashMap3).waitForRun(ProgramRunStatus.COMPLETED, 1L, TimeUnit.MINUTES);
        validateFileOutput(fileSet.getLocation("xx"), "custom:");
        fileSet.getLocation("nn").delete(true);
        fileSet.getLocation("xx").delete(true);
    }

    private void testSparkWithCustomFileSet(ApplicationManager applicationManager, String str) throws Exception {
        SparkAppUsingFileSet.MyFileSet myFileSet = (SparkAppUsingFileSet.MyFileSet) getDataset("myfs").get();
        FileSet embeddedFileSet = myFileSet.getEmbeddedFileSet();
        prepareFileInput(embeddedFileSet.getLocation("nn"));
        HashMap hashMap = new HashMap();
        FileSetArguments.setInputPath(hashMap, "nn");
        HashMap hashMap2 = new HashMap();
        FileSetArguments.setOutputPath(hashMap, "xx");
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "myfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "myfs", hashMap2));
        hashMap3.put("input", "myfs");
        hashMap3.put("output", "myfs");
        applicationManager.getSparkManager(str).start(hashMap3).waitForRun(ProgramRunStatus.COMPLETED, 2L, TimeUnit.MINUTES);
        Assert.assertEquals(1L, r0.getHistory(ProgramRunStatus.COMPLETED).size());
        validateFileOutput(embeddedFileSet.getLocation("xx"));
        Assert.assertTrue(myFileSet.getSuccessLocation().exists());
        Assert.assertFalse(myFileSet.getFailureLocation().exists());
        myFileSet.getSuccessLocation().delete();
        applicationManager.getSparkManager(str).start(hashMap3).waitForRun(ProgramRunStatus.FAILED, 2L, TimeUnit.MINUTES);
        Assert.assertFalse(myFileSet.getSuccessLocation().exists());
        Assert.assertTrue(myFileSet.getFailureLocation().exists());
        embeddedFileSet.getLocation("nn").delete(true);
        embeddedFileSet.getLocation("xx").delete(true);
        myFileSet.getSuccessLocation().delete(true);
        myFileSet.getFailureLocation().delete(true);
    }

    private void testSparkWithTimePartitionedFileSet(ApplicationManager applicationManager, String str) throws Exception {
        DataSetManager dataset = getDataset("tpfs");
        long currentTimeMillis = System.currentTimeMillis();
        long millis = currentTimeMillis + TimeUnit.HOURS.toMillis(1L);
        addTimePartition(dataset, currentTimeMillis);
        addTimePartition(dataset, 987654321L);
        HashMap hashMap = new HashMap();
        TimePartitionedFileSetArguments.setInputStartTime(hashMap, currentTimeMillis - 100);
        TimePartitionedFileSetArguments.setInputEndTime(hashMap, currentTimeMillis + 100);
        HashMap hashMap2 = new HashMap();
        TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap2, millis);
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "tpfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "tpfs", hashMap2));
        hashMap3.put("input", "tpfs");
        hashMap3.put("output", "tpfs");
        hashMap3.put("outputKey", String.valueOf(123456789L));
        hashMap3.put("inputKey", String.valueOf(987654321L));
        applicationManager.getSparkManager(str).start(hashMap3).waitForRun(ProgramRunStatus.COMPLETED, 10L, TimeUnit.MINUTES);
        dataset.flush();
        TimePartitionedFileSet timePartitionedFileSet = (TimePartitionedFileSet) dataset.get();
        TimePartitionDetail partitionByTime = timePartitionedFileSet.getPartitionByTime(millis);
        Assert.assertNotNull("Output partition is null while for running without custom dataset arguments", partitionByTime);
        validateFileOutput(partitionByTime.getLocation());
        TimePartitionDetail partitionByTime2 = timePartitionedFileSet.getPartitionByTime(123456789L);
        Assert.assertNotNull("Output partition is null while for running with custom dataset arguments", partitionByTime2);
        validateFileOutput(partitionByTime2.getLocation());
        timePartitionedFileSet.dropPartition(currentTimeMillis);
        timePartitionedFileSet.dropPartition(987654321L);
        timePartitionedFileSet.dropPartition(partitionByTime.getPartitionKey());
        timePartitionedFileSet.dropPartition(partitionByTime2.getPartitionKey());
        dataset.flush();
    }

    private void testSparkWithPartitionedFileSet(ApplicationManager applicationManager, String str) throws Exception {
        DataSetManager dataset = getDataset("pfs");
        PartitionedFileSet partitionedFileSet = (PartitionedFileSet) dataset.get();
        PartitionOutput partitionOutput = partitionedFileSet.getPartitionOutput(PartitionKey.builder().addStringField("x", "nn").build());
        prepareFileInput(partitionOutput.getLocation());
        partitionOutput.addPartition();
        dataset.flush();
        HashMap hashMap = new HashMap();
        PartitionedFileSetArguments.setInputPartitionFilter(hashMap, PartitionFilter.builder().addRangeCondition("x", "na", "nx").build());
        HashMap hashMap2 = new HashMap();
        PartitionKey build = PartitionKey.builder().addStringField("x", "xx").build();
        PartitionedFileSetArguments.setOutputPartitionKey(hashMap2, build);
        HashMap hashMap3 = new HashMap();
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "pfs", hashMap));
        hashMap3.putAll(RuntimeArguments.addScope(Scope.DATASET, "pfs", hashMap2));
        hashMap3.put("input", "pfs");
        hashMap3.put("output", "pfs");
        applicationManager.getSparkManager(str).start(hashMap3).waitForRun(ProgramRunStatus.COMPLETED, 10L, TimeUnit.MINUTES);
        dataset.flush();
        PartitionDetail partition = partitionedFileSet.getPartition(build);
        Assert.assertNotNull(partition);
        validateFileOutput(partition.getLocation());
        partitionedFileSet.dropPartition(partitionOutput.getPartitionKey());
        partitionedFileSet.dropPartition(partition.getPartitionKey());
        dataset.flush();
    }

    private void prepareFileInput(Location location) throws IOException {
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(location.getOutputStream());
        Throwable th = null;
        try {
            try {
                outputStreamWriter.write("13 characters\n");
                outputStreamWriter.write("7 chars\n");
                if (outputStreamWriter != null) {
                    if (0 == 0) {
                        outputStreamWriter.close();
                        return;
                    }
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (outputStreamWriter != null) {
                if (th != null) {
                    try {
                        outputStreamWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    outputStreamWriter.close();
                }
            }
            throw th4;
        }
    }

    private void validateFileOutput(Location location) throws Exception {
        validateFileOutput(location, "");
    }

    private void validateFileOutput(Location location, String str) throws Exception {
        Assert.assertTrue(location.isDirectory());
        for (Location location2 : location.list()) {
            if (location2.getName().startsWith("part-r-")) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(location2.getInputStream()));
                Throwable th = null;
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        Assert.assertNotNull(readLine);
                        Assert.assertEquals(str + "13 characters:13", readLine);
                        String readLine2 = bufferedReader.readLine();
                        Assert.assertNotNull(readLine2);
                        Assert.assertEquals(str + "7 chars:7", readLine2);
                        Assert.assertNull(bufferedReader.readLine());
                        if (bufferedReader != null) {
                            if (0 == 0) {
                                bufferedReader.close();
                                return;
                            }
                            try {
                                bufferedReader.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (bufferedReader != null) {
                        if (th != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th4;
                }
            }
        }
        Assert.fail("Output directory does not contain any part file: " + location.list());
    }

    private void addTimePartition(DataSetManager<TimePartitionedFileSet> dataSetManager, long j) throws IOException, TransactionFailureException, InterruptedException {
        TimePartitionOutput partitionOutput = ((TimePartitionedFileSet) dataSetManager.get()).getPartitionOutput(j);
        prepareFileInput(partitionOutput.getLocation());
        partitionOutput.addPartition();
        dataSetManager.flush();
    }
}
