/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.quarkus.component.spark.it;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.spark.DataFrameCallback;
import org.apache.camel.component.spark.RddCallback;
import org.apache.camel.component.spark.Sparks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;

@Path(value="/spark")
@ApplicationScoped
public class SparkResource {
    @Inject
    CamelContext context;
    String sparkUri = "spark:rdd?rdd=#testFileRdd";
    String sparkDataFrameUri = "spark:dataframe?dataFrame=#jsonCars";
    String sparkHiveUri = "spark:hive";
    private JavaSparkContext sparkContext;
    private HiveContext hiveContext;
    private java.nio.file.Path rddFilePath;
    private java.nio.file.Path carsJsonPath;
    @Inject
    ProducerTemplate template;

    @PostConstruct
    void init() {
        this.sparkContext = Sparks.createLocalSparkContext();
        this.hiveContext = new HiveContext(this.sparkContext.sc());
        try {
            java.nio.file.Path tmpDir = Paths.get("target/tmp", new String[0]);
            Files.createDirectories(tmpDir, new FileAttribute[0]);
            this.rddFilePath = this.copyResource(tmpDir, "testrdd.txt");
            this.carsJsonPath = this.copyResource(tmpDir, "cars.json");
        }
        catch (IOException e) {
            throw new RuntimeException("Could not create a temporary file", e);
        }
    }

    private java.nio.file.Path copyResource(java.nio.file.Path tmpDir, String resource) throws IOException {
        java.nio.file.Path file = tmpDir.resolve(resource);
        try (InputStream in = this.getClass().getClassLoader().getResourceAsStream(resource);){
            Files.copy(in, file, new CopyOption[0]);
        }
        return file;
    }

    @Produces
    @Named
    JavaRDD<String> testFileRdd() {
        return this.sparkContext.textFile(this.rddFilePath.toString());
    }

    @Produces
    @Named
    Dataset<Row> jsonCars() {
        Dataset jsonCars = this.hiveContext.read().json(this.carsJsonPath.toString());
        jsonCars.registerTempTable("cars");
        return jsonCars;
    }

    @Produces
    @Named
    RddCallback countLinesTransformation() {
        return new RddCallback(){

            public Object onRdd(JavaRDDLike rdd, Object ... payloads) {
                return rdd.count();
            }
        };
    }

    @Produces
    @Named
    HiveContext hiveContext() {
        return this.hiveContext;
    }

    @Path(value="/rdd/count")
    @GET
    @javax.ws.rs.Produces(value={"text/plain"})
    public Long rddCount() throws Exception {
        return (Long)this.template.requestBodyAndHeader(this.sparkUri, null, "CAMEL_SPARK_RDD_CALLBACK", (Object)new RddCallback(){

            public Long onRdd(JavaRDDLike rdd, Object ... payloads) {
                return rdd.count();
            }
        }, Long.class);
    }

    @Path(value="/dataframe/{model}/count")
    @GET
    @javax.ws.rs.Produces(value={"text/plain"})
    public Long dataframeCount(@PathParam(value="model") String model) throws Exception {
        return (Long)this.template.requestBodyAndHeader(this.sparkDataFrameUri, (Object)model, "CAMEL_SPARK_DATAFRAME_CALLBACK", (Object)new DataFrameCallback<Long>(){

            public Long onDataFrame(Dataset<Row> dataFrame, Object ... payloads) {
                String model = (String)payloads[0];
                return dataFrame.where(dataFrame.col("model").eqNullSafe((Object)model)).count();
            }
        }, Long.class);
    }

    @Path(value="/hive/count")
    @GET
    @javax.ws.rs.Produces(value={"text/plain"})
    public Long hiveCount() throws Exception {
        return (Long)this.template.requestBody(this.sparkHiveUri + "?collect=false", (Object)"SELECT * FROM cars", Long.class);
    }
}

