package co.cask.cdap.spark.service;

import co.cask.cdap.api.ServiceDiscoverer;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.test.app.WorkflowAppWithLocalDatasets;
import com.google.common.io.Closeables;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Arrays;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.commons.io.Charsets;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/spark/service/TestSparkServiceIntegrationApp.class */
public class TestSparkServiceIntegrationApp extends AbstractApplication {
    public static final String SERVICE_NAME = "SquareService";
    public static final String SERVICE_METHOD_NAME = "SquareService";

    /* loaded from: input_file:co/cask/cdap/spark/service/TestSparkServiceIntegrationApp$SparkServiceProgram.class */
    public static class SparkServiceProgram implements JavaSparkMain {
        public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
            JavaRDD parallelize = new JavaSparkContext().parallelize(Arrays.asList(1, 2, 3, 4, 5));
            parallelize.count();
            final ServiceDiscoverer serviceDiscoverer = javaSparkExecutionContext.getServiceDiscoverer();
            javaSparkExecutionContext.saveAsDataset(parallelize.mapToPair(new PairFunction<Integer, byte[], byte[]>() { // from class: co.cask.cdap.spark.service.TestSparkServiceIntegrationApp.SparkServiceProgram.1
                public Tuple2<byte[], byte[]> call(Integer num) throws Exception {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new URL(serviceDiscoverer.getServiceURL("SquareService"), String.format("SquareService/%s", String.valueOf(num))).openConnection().getInputStream()));
                    String readLine = bufferedReader.readLine();
                    Closeables.closeQuietly(bufferedReader);
                    return new Tuple2<>(Bytes.toBytes(String.valueOf(num)), Bytes.toBytes(readLine));
                }
            }), WorkflowAppWithLocalDatasets.RESULT_DATASET);
        }
    }

    /* loaded from: input_file:co/cask/cdap/spark/service/TestSparkServiceIntegrationApp$SparkServiceProgramSpec.class */
    public static class SparkServiceProgramSpec extends AbstractSpark {
        public void configure() {
            setName("SparkServiceProgram");
            setDescription("Test Spark with Service");
            setMainClass(SparkServiceProgram.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/spark/service/TestSparkServiceIntegrationApp$SquareHandler.class */
    public class SquareHandler extends AbstractHttpServiceHandler {
        public SquareHandler() {
        }

        @GET
        @Path("SquareService/{num}")
        public void square(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("num") String str) {
            if (str.isEmpty()) {
                httpServiceResponder.sendError(HttpResponseStatus.NO_CONTENT.getCode(), "No number provided");
            } else {
                httpServiceResponder.sendString(HttpResponseStatus.OK.getCode(), String.valueOf(Integer.parseInt(str) * Integer.parseInt(str)), Charsets.UTF_8);
            }
        }
    }

    public void configure() {
        setName("TestSparkServiceIntegrationApp");
        setDescription("App to test Spark with Service");
        createDataset(WorkflowAppWithLocalDatasets.RESULT_DATASET, KeyValueTable.class);
        addSpark(new SparkServiceProgramSpec());
        addService("SquareService", new SquareHandler(), new HttpServiceHandler[0]);
    }
}
