package co.cask.cdap.examples.sparkpagerank;

import co.cask.cdap.api.ServiceDiscoverer;
import co.cask.cdap.api.metadata.Metadata;
import co.cask.cdap.api.metadata.MetadataEntity;
import co.cask.cdap.api.metadata.MetadataScope;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.examples.sparkpagerank.SparkPageRankApp;
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram.class */
public class SparkPageRankProgram implements JavaSparkMain {
    static final String ITERATIONS_COUNT = "iteration_count_";
    static final int ITERATIONS_COUNT_VALUE = 10;
    private static final String POPULAR_PAGES = "total.popular.pages";
    private static final String UNPOPULAR_PAGES = "total.unpopular.pages";
    private static final String REGULAR_PAGES = "total.regular.pages";
    private static final int POPULAR_PAGE_THRESHOLD = 10;
    private static final int UNPOPULAR_PAGE_THRESHOLD = 3;
    private static final Logger LOG = LoggerFactory.getLogger(SparkPageRankProgram.class);
    private static final Pattern SPACES = Pattern.compile("\\s+");

    /* loaded from: input_file:co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram$Sum.class */
    private static class Sum implements Function2<Double, Double, Double> {
        private Sum() {
        }

        public Double call(Double d, Double d2) {
            return Double.valueOf(d.doubleValue() + d2.doubleValue());
        }
    }

    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        new JavaSparkContext();
        LOG.info("Processing backlinkURLs data");
        JavaPairRDD fromStream = javaSparkExecutionContext.fromStream(SparkPageRankApp.BACKLINK_URL_STREAM, String.class);
        int iterationCount = getIterationCount(javaSparkExecutionContext);
        LOG.info("Grouping data by key");
        JavaPairRDD cache = fromStream.values().mapToPair(str -> {
            String[] split = SPACES.split(str);
            return new Tuple2(split[0], split[1]);
        }).distinct().groupByKey().cache();
        JavaPairRDD mapValues = cache.mapValues(iterable -> {
            return Double.valueOf(1.0d);
        });
        for (int i = 0; i < iterationCount; i++) {
            LOG.debug("Processing data with PageRank algorithm. Iteration {}/{}", Integer.valueOf(i + 1), Integer.valueOf(iterationCount));
            mapValues = cache.join(mapValues).values().flatMapToPair(tuple2 -> {
                LOG.debug("Processing {} with rank {}", tuple2._1(), tuple2._2());
                int size = Iterables.size((Iterable) tuple2._1());
                ArrayList arrayList = new ArrayList();
                Iterator it = ((Iterable) tuple2._1()).iterator();
                while (it.hasNext()) {
                    arrayList.add(new Tuple2((String) it.next(), Double.valueOf(((Double) tuple2._2()).doubleValue() / size)));
                }
                return arrayList.iterator();
            }).reduceByKey(new Sum()).mapValues(d -> {
                return Double.valueOf(0.15d + (d.doubleValue() * 0.85d));
            });
        }
        LOG.info("Writing ranks data");
        ServiceDiscoverer serviceDiscoverer = javaSparkExecutionContext.getServiceDiscoverer();
        Metrics metrics = javaSparkExecutionContext.getMetrics();
        javaSparkExecutionContext.saveAsDataset(mapValues.mapToPair(tuple22 -> {
            LOG.debug("URL {} has rank {}", Arrays.toString(((String) tuple22._1()).getBytes(Charsets.UTF_8)), tuple22._2());
            URL serviceURL = serviceDiscoverer.getServiceURL(SparkPageRankApp.SERVICE_HANDLERS);
            if (serviceURL == null) {
                throw new RuntimeException("Failed to discover service: SparkPageRankService");
            }
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new URL(serviceURL, String.format("%s/%s", SparkPageRankApp.SparkPageRankServiceHandler.TRANSFORM_PATH, ((Double) tuple22._2()).toString())).openConnection().getInputStream(), Charsets.UTF_8));
                Throwable th = null;
                try {
                    try {
                        int parseInt = Integer.parseInt(bufferedReader.readLine());
                        if (parseInt == 10) {
                            metrics.count(POPULAR_PAGES, 1);
                        } else if (parseInt <= 3) {
                            metrics.count(UNPOPULAR_PAGES, 1);
                        } else {
                            metrics.count(REGULAR_PAGES, 1);
                        }
                        Tuple2 tuple22 = new Tuple2(((String) tuple22._1()).getBytes(Charsets.UTF_8), Integer.valueOf(parseInt));
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                        return tuple22;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                LOG.warn("Failed to read the Stream for service {}", SparkPageRankApp.SERVICE_HANDLERS, e);
                throw Throwables.propagate(e);
            }
        }), "ranks");
        for (String str2 : ((Metadata) javaSparkExecutionContext.getMetadata(MetadataEntity.ofDataset(javaSparkExecutionContext.getNamespace(), "ranks")).get(MetadataScope.USER)).getTags()) {
            if (str2.startsWith(ITERATIONS_COUNT)) {
                try {
                    Integer.parseInt(str2.substring(str2.indexOf(ITERATIONS_COUNT) + ITERATIONS_COUNT.length()));
                    javaSparkExecutionContext.removeTags(MetadataEntity.ofDataset(javaSparkExecutionContext.getNamespace(), "ranks"), new String[]{str2});
                } catch (NumberFormatException e) {
                }
            }
        }
        javaSparkExecutionContext.addTags(MetadataEntity.ofDataset(javaSparkExecutionContext.getNamespace(), "ranks"), new String[]{ITERATIONS_COUNT + iterationCount});
        LOG.info("PageRanks successfuly computed and written to \"ranks\" dataset");
    }

    private int getIterationCount(JavaSparkExecutionContext javaSparkExecutionContext) {
        String str = (String) javaSparkExecutionContext.getRuntimeArguments().get("args");
        if (str == null) {
            return 10;
        }
        String[] split = str.split("\\s");
        if (split.length > 0) {
            return Integer.parseInt(split[0]);
        }
        return 10;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2065948147:
                if (implMethodName.equals("lambda$run$ed8cee11$1")) {
                    z = true;
                    break;
                }
                break;
            case -1423251059:
                if (implMethodName.equals("lambda$run$85656fb$1")) {
                    z = 3;
                    break;
                }
                break;
            case -265567493:
                if (implMethodName.equals("lambda$run$81710137$1")) {
                    z = false;
                    break;
                }
                break;
            case -147328112:
                if (implMethodName.equals("lambda$run$b913aaa5$1")) {
                    z = 2;
                    break;
                }
                break;
            case 983334214:
                if (implMethodName.equals("lambda$run$d0389061$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Ljava/util/Iterator;")) {
                    return tuple2 -> {
                        LOG.debug("Processing {} with rank {}", tuple2._1(), tuple2._2());
                        int size = Iterables.size((Iterable) tuple2._1());
                        ArrayList arrayList = new ArrayList();
                        Iterator it = ((Iterable) tuple2._1()).iterator();
                        while (it.hasNext()) {
                            arrayList.add(new Tuple2((String) it.next(), Double.valueOf(((Double) tuple2._2()).doubleValue() / size)));
                        }
                        return arrayList.iterator();
                    };
                }
                break;
            case Ascii.SOH /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Double;")) {
                    return iterable -> {
                        return Double.valueOf(1.0d);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lscala/Tuple2;")) {
                    return str -> {
                        String[] split = SPACES.split(str);
                        return new Tuple2(split[0], split[1]);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Double;)Ljava/lang/Double;")) {
                    return d -> {
                        return Double.valueOf(0.15d + (d.doubleValue() * 0.85d));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("co/cask/cdap/examples/sparkpagerank/SparkPageRankProgram") && serializedLambda.getImplMethodSignature().equals("(Lco/cask/cdap/api/ServiceDiscoverer;Lco/cask/cdap/api/metrics/Metrics;Lscala/Tuple2;)Lscala/Tuple2;")) {
                    ServiceDiscoverer serviceDiscoverer = (ServiceDiscoverer) serializedLambda.getCapturedArg(0);
                    Metrics metrics = (Metrics) serializedLambda.getCapturedArg(1);
                    return tuple22 -> {
                        LOG.debug("URL {} has rank {}", Arrays.toString(((String) tuple22._1()).getBytes(Charsets.UTF_8)), tuple22._2());
                        URL serviceURL = serviceDiscoverer.getServiceURL(SparkPageRankApp.SERVICE_HANDLERS);
                        if (serviceURL == null) {
                            throw new RuntimeException("Failed to discover service: SparkPageRankService");
                        }
                        try {
                            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new URL(serviceURL, String.format("%s/%s", SparkPageRankApp.SparkPageRankServiceHandler.TRANSFORM_PATH, ((Double) tuple22._2()).toString())).openConnection().getInputStream(), Charsets.UTF_8));
                            Throwable th = null;
                            try {
                                try {
                                    int parseInt = Integer.parseInt(bufferedReader.readLine());
                                    if (parseInt == 10) {
                                        metrics.count(POPULAR_PAGES, 1);
                                    } else if (parseInt <= 3) {
                                        metrics.count(UNPOPULAR_PAGES, 1);
                                    } else {
                                        metrics.count(REGULAR_PAGES, 1);
                                    }
                                    Tuple2 tuple22 = new Tuple2(((String) tuple22._1()).getBytes(Charsets.UTF_8), Integer.valueOf(parseInt));
                                    if (bufferedReader != null) {
                                        if (0 != 0) {
                                            try {
                                                bufferedReader.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            bufferedReader.close();
                                        }
                                    }
                                    return tuple22;
                                } finally {
                                }
                            } finally {
                            }
                        } catch (Exception e) {
                            LOG.warn("Failed to read the Stream for service {}", SparkPageRankApp.SERVICE_HANDLERS, e);
                            throw Throwables.propagate(e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
