package co.cask.cdap.examples.sparkpagerank;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.ObjectStores;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.workflow.AbstractWorkflow;
import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.Iterator;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

/* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp.class */
public class SparkPageRankApp extends AbstractApplication {
    public static final String SERVICE_HANDLERS = "SparkPageRankService";
    public static final String BACKLINK_URL_STREAM = "backlinkURLStream";

    /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp$PageRankSpark.class */
    public static final class PageRankSpark extends AbstractSpark {
        public void configure() {
            setDescription("Spark Page Rank Program");
            setMainClass(SparkPageRankProgram.class);
            setDriverResources(new Resources(1024));
            setExecutorResources(new Resources(1024));
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp$PageRankWorkflow.class */
    public static class PageRankWorkflow extends AbstractWorkflow {
        public void configure() {
            setDescription("Runs SparkPageRankProgram followed by RanksCounter MapReduce");
            addSpark(PageRankSpark.class.getSimpleName());
            addMapReduce(RanksCounter.class.getSimpleName());
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp$RanksCounter.class */
    public static class RanksCounter extends AbstractMapReduce {

        /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp$RanksCounter$Counter.class */
        public static class Counter extends Reducer<IntWritable, IntWritable, byte[], Integer> {
            public void reduce(IntWritable intWritable, Iterable<IntWritable> iterable, Reducer<IntWritable, IntWritable, byte[], Integer>.Context context) throws IOException, InterruptedException {
                int i = 0;
                Iterator<IntWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    i += it.next().get();
                }
                context.write(Bytes.toBytes(intWritable.get()), Integer.valueOf(i));
            }

            public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
                reduce((IntWritable) obj, (Iterable<IntWritable>) iterable, (Reducer<IntWritable, IntWritable, byte[], Integer>.Context) context);
            }
        }

        /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp$RanksCounter$Emitter.class */
        public static class Emitter extends Mapper<byte[], Integer, IntWritable, IntWritable> {
            private static final IntWritable ONE = new IntWritable(1);

            protected void map(byte[] bArr, Integer num, Mapper<byte[], Integer, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException {
                context.write(new IntWritable(num.intValue()), ONE);
            }

            protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
                map((byte[]) obj, (Integer) obj2, (Mapper<byte[], Integer, IntWritable, IntWritable>.Context) context);
            }
        }

        public void configure() {
            setInputDataset("ranks");
        }

        public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
            Job job = (Job) mapReduceContext.getHadoopJob();
            job.setMapperClass(Emitter.class);
            job.setReducerClass(Counter.class);
            job.setNumReduceTasks(1);
            mapReduceContext.addOutput("rankscount");
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankApp$SparkPageRankServiceHandler.class */
    public static final class SparkPageRankServiceHandler extends AbstractHttpServiceHandler {
        private static final Gson GSON = new Gson();
        public static final String URL_KEY = "url";
        public static final String RANKS_PATH = "rank";
        public static final String TOTAL_PAGES_PATH = "total";
        public static final String TRANSFORM_PATH = "transform";

        @UseDataSet("rankscount")
        private ObjectStore<Integer> store;

        @UseDataSet("ranks")
        private ObjectStore<Integer> ranks;

        @POST
        @Path(RANKS_PATH)
        public void getRank(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder) {
            String asString = ((JsonObject) GSON.fromJson(Charsets.UTF_8.decode(httpServiceRequest.getContent()).toString(), JsonObject.class)).get(URL_KEY).getAsString();
            if (asString == null) {
                httpServiceResponder.sendError(400, "The url must be specified with \"url\" as key in JSON.");
                return;
            }
            Integer num = (Integer) this.ranks.read(asString.getBytes(Charsets.UTF_8));
            if (num == null) {
                httpServiceResponder.sendError(204, String.format("No rank found of %s", asString));
            } else {
                httpServiceResponder.sendString(num.toString());
            }
        }

        @GET
        @Path("total/{pr}")
        public void centers(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("pr") Integer num) {
            Integer num2 = (Integer) this.store.read(Bytes.toBytes(num.intValue()));
            if (num2 == null) {
                httpServiceResponder.sendString(204, String.format("No pages found with pr: %s", num), Charsets.UTF_8);
            } else {
                httpServiceResponder.sendString(200, num2.toString(), Charsets.UTF_8);
            }
        }

        @GET
        @Path("transform/{pr}")
        public void transform(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("pr") String str) {
            httpServiceResponder.sendString(String.valueOf((int) Math.round(Double.parseDouble(str) * 10.0d)));
        }
    }

    public void configure() {
        setName("SparkPageRank");
        setDescription("Spark page rank application.");
        addStream(new Stream(BACKLINK_URL_STREAM));
        addSpark(new PageRankSpark());
        addMapReduce(new RanksCounter());
        addWorkflow(new PageRankWorkflow());
        addService(SERVICE_HANDLERS, new SparkPageRankServiceHandler(), new HttpServiceHandler[0]);
        try {
            ObjectStores.createObjectStore(getConfigurer(), "ranks", Integer.class);
            ObjectStores.createObjectStore(getConfigurer(), "rankscount", Integer.class);
        } catch (UnsupportedTypeException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
