package co.cask.cdap.examples.sparkpagerank;

import co.cask.cdap.api.ServiceDiscoverer;
import co.cask.cdap.api.metadata.Metadata;
import co.cask.cdap.api.metadata.MetadataEntity;
import co.cask.cdap.api.metadata.MetadataScope;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.examples.sparkpagerank.SparkPageRankApp;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram.class */
public class SparkPageRankProgram implements JavaSparkMain {
    static final String ITERATIONS_COUNT = "iteration_count_";
    static final int ITERATIONS_COUNT_VALUE = 10;
    private static final String POPULAR_PAGES = "total.popular.pages";
    private static final String UNPOPULAR_PAGES = "total.unpopular.pages";
    private static final String REGULAR_PAGES = "total.regular.pages";
    private static final int POPULAR_PAGE_THRESHOLD = 10;
    private static final int UNPOPULAR_PAGE_THRESHOLD = 3;
    private static final Logger LOG = LoggerFactory.getLogger(SparkPageRankProgram.class);
    private static final Pattern SPACES = Pattern.compile("\\s+");

    /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram$Sum.class */
    private static class Sum implements Function2<Double, Double, Double> {
        private Sum() {
        }

        public Double call(Double d, Double d2) {
            return Double.valueOf(d.doubleValue() + d2.doubleValue());
        }
    }

    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        new JavaSparkContext();
        LOG.info("Processing backlinkURLs data");
        JavaPairRDD fromStream = javaSparkExecutionContext.fromStream(SparkPageRankApp.BACKLINK_URL_STREAM, String.class);
        int iterationCount = getIterationCount(javaSparkExecutionContext);
        LOG.info("Grouping data by key");
        JavaPairRDD cache = fromStream.values().mapToPair(new PairFunction<String, String, String>() { // from class: co.cask.cdap.examples.sparkpagerank.SparkPageRankProgram.1
            public Tuple2<String, String> call(String str) {
                String[] split = SparkPageRankProgram.SPACES.split(str);
                return new Tuple2<>(split[0], split[1]);
            }
        }).distinct().groupByKey().cache();
        JavaPairRDD mapValues = cache.mapValues(new Function<Iterable<String>, Double>() { // from class: co.cask.cdap.examples.sparkpagerank.SparkPageRankProgram.2
            public Double call(Iterable<String> iterable) {
                return Double.valueOf(1.0d);
            }
        });
        for (int i = 0; i < iterationCount; i++) {
            LOG.debug("Processing data with PageRank algorithm. Iteration {}/{}", Integer.valueOf(i + 1), Integer.valueOf(iterationCount));
            mapValues = cache.join(mapValues).values().flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { // from class: co.cask.cdap.examples.sparkpagerank.SparkPageRankProgram.3
                public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> tuple2) {
                    SparkPageRankProgram.LOG.debug("Processing {} with rank {}", tuple2._1(), tuple2._2());
                    int size = Iterables.size((Iterable) tuple2._1());
                    ArrayList arrayList = new ArrayList();
                    Iterator it = ((Iterable) tuple2._1()).iterator();
                    while (it.hasNext()) {
                        arrayList.add(new Tuple2((String) it.next(), Double.valueOf(((Double) tuple2._2()).doubleValue() / size)));
                    }
                    return arrayList;
                }
            }).reduceByKey(new Sum()).mapValues(new Function<Double, Double>() { // from class: co.cask.cdap.examples.sparkpagerank.SparkPageRankProgram.4
                public Double call(Double d) {
                    return Double.valueOf(0.15d + (d.doubleValue() * 0.85d));
                }
            });
        }
        LOG.info("Writing ranks data");
        final ServiceDiscoverer serviceDiscoverer = javaSparkExecutionContext.getServiceDiscoverer();
        final Metrics metrics = javaSparkExecutionContext.getMetrics();
        javaSparkExecutionContext.saveAsDataset(mapValues.mapToPair(new PairFunction<Tuple2<String, Double>, byte[], Integer>() { // from class: co.cask.cdap.examples.sparkpagerank.SparkPageRankProgram.5
            public Tuple2<byte[], Integer> call(Tuple2<String, Double> tuple2) throws Exception {
                SparkPageRankProgram.LOG.debug("URL {} has rank {}", Arrays.toString(((String) tuple2._1()).getBytes(Charsets.UTF_8)), tuple2._2());
                URL serviceURL = serviceDiscoverer.getServiceURL(SparkPageRankApp.SERVICE_HANDLERS);
                if (serviceURL == null) {
                    throw new RuntimeException("Failed to discover service: SparkPageRankService");
                }
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new URL(serviceURL, String.format("%s/%s", SparkPageRankApp.SparkPageRankServiceHandler.TRANSFORM_PATH, ((Double) tuple2._2()).toString())).openConnection().getInputStream(), Charsets.UTF_8));
                    Throwable th = null;
                    try {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (Integer.parseInt(readLine) == 10) {
                                metrics.count(SparkPageRankProgram.POPULAR_PAGES, 1);
                            } else if (Integer.parseInt(readLine) <= 3) {
                                metrics.count(SparkPageRankProgram.UNPOPULAR_PAGES, 1);
                            } else {
                                metrics.count(SparkPageRankProgram.REGULAR_PAGES, 1);
                            }
                            Tuple2<byte[], Integer> tuple22 = new Tuple2<>(((String) tuple2._1()).getBytes(Charsets.UTF_8), Integer.valueOf(Integer.parseInt(readLine)));
                            if (bufferedReader != null) {
                                if (0 != 0) {
                                    try {
                                        bufferedReader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    bufferedReader.close();
                                }
                            }
                            return tuple22;
                        } finally {
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    SparkPageRankProgram.LOG.warn("Failed to read the Stream for service {}", SparkPageRankApp.SERVICE_HANDLERS, e);
                    throw Throwables.propagate(e);
                }
            }
        }), "ranks");
        for (String str : ((Metadata) javaSparkExecutionContext.getMetadata(MetadataEntity.ofDataset(javaSparkExecutionContext.getNamespace(), "ranks")).get(MetadataScope.USER)).getTags()) {
            if (str.startsWith(ITERATIONS_COUNT)) {
                try {
                    Integer.parseInt(str.substring(str.indexOf(ITERATIONS_COUNT) + ITERATIONS_COUNT.length()));
                    javaSparkExecutionContext.removeTags(MetadataEntity.ofDataset(javaSparkExecutionContext.getNamespace(), "ranks"), new String[]{str});
                } catch (NumberFormatException e) {
                }
            }
        }
        javaSparkExecutionContext.addTags(MetadataEntity.ofDataset(javaSparkExecutionContext.getNamespace(), "ranks"), new String[]{ITERATIONS_COUNT + iterationCount});
        LOG.info("PageRanks successfuly computed and written to \"ranks\" dataset");
    }

    private int getIterationCount(JavaSparkExecutionContext javaSparkExecutionContext) {
        String str = (String) javaSparkExecutionContext.getRuntimeArguments().get("args");
        if (str == null) {
            return 10;
        }
        String[] split = str.split("\\s");
        if (split.length > 0) {
            return Integer.parseInt(split[0]);
        }
        return 10;
    }
}
