package zipkin2.dependencies.cassandra3;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import java.lang.invoke.SerializedLambda;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import zipkin2.DependencyLink;
import zipkin2.internal.DateUtil;

/* loaded from: input_file:zipkin2/dependencies/cassandra3/CassandraDependenciesJob.class */
public final class CassandraDependenciesJob {
    static final Logger log = LoggerFactory.getLogger(CassandraDependenciesJob.class);
    final String keyspace;
    final long day;
    final boolean strictTraceId;
    final boolean inTest;
    final String dateStamp;
    final SparkConf conf;

    @Nullable
    final Runnable logInitializer;

    /* loaded from: input_file:zipkin2/dependencies/cassandra3/CassandraDependenciesJob$Builder.class */
    public static final class Builder {
        String[] jars;
        Runnable logInitializer;
        final Map<String, String> sparkProperties = new LinkedHashMap();
        String keyspace = CassandraDependenciesJob.getEnv("CASSANDRA_KEYSPACE", "zipkin2");
        String contactPoints = CassandraDependenciesJob.getEnv("CASSANDRA_CONTACT_POINTS", "localhost");
        String localDc = CassandraDependenciesJob.getEnv("CASSANDRA_LOCAL_DC", null);
        String sparkMaster = CassandraDependenciesJob.getEnv("SPARK_MASTER", "local[*]");
        long day = DateUtil.midnightUTC(System.currentTimeMillis());
        boolean strictTraceId = Boolean.parseBoolean(CassandraDependenciesJob.getEnv("STRICT_TRACE_ID", "true"));
        boolean inTest = false;

        Builder() {
            this.sparkProperties.put("spark.ui.enabled", "false");
            this.sparkProperties.put("spark.cassandra.connection.ssl.enabled", CassandraDependenciesJob.getEnv("CASSANDRA_USE_SSL", "false"));
            this.sparkProperties.put("spark.cassandra.connection.ssl.trustStore.password", System.getProperty("javax.net.ssl.trustStorePassword", ""));
            this.sparkProperties.put("spark.cassandra.connection.ssl.trustStore.path", System.getProperty("javax.net.ssl.trustStore", ""));
            this.sparkProperties.put("spark.cassandra.auth.username", CassandraDependenciesJob.getEnv("CASSANDRA_USERNAME", ""));
            this.sparkProperties.put("spark.cassandra.auth.password", CassandraDependenciesJob.getEnv("CASSANDRA_PASSWORD", ""));
        }

        public Builder jars(String... strArr) {
            this.jars = strArr;
            return this;
        }

        public Builder keyspace(String str) {
            this.keyspace = (String) Preconditions.checkNotNull(str, "keyspace");
            return this;
        }

        public Builder day(long j) {
            this.day = DateUtil.midnightUTC(j);
            return this;
        }

        public Builder logInitializer(Runnable runnable) {
            this.logInitializer = (Runnable) Preconditions.checkNotNull(runnable, "logInitializer");
            return this;
        }

        public Builder contactPoints(String str) {
            this.contactPoints = str;
            return this;
        }

        public Builder localDc(@Nullable String str) {
            this.localDc = str;
            return this;
        }

        public Builder strictTraceId(boolean z) {
            this.strictTraceId = z;
            return this;
        }

        public Builder internalInTest(boolean z) {
            this.inTest = z;
            return this;
        }

        public CassandraDependenciesJob build() {
            return new CassandraDependenciesJob(this);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    CassandraDependenciesJob(Builder builder) {
        this.keyspace = builder.keyspace;
        this.day = builder.day;
        this.strictTraceId = builder.strictTraceId;
        this.inTest = builder.inTest;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
        this.dateStamp = simpleDateFormat.format(new Date(builder.day));
        this.conf = new SparkConf(true).setMaster(builder.sparkMaster).setAppName(getClass().getName());
        this.conf.set("spark.cassandra.connection.host", parseHosts(builder.contactPoints));
        this.conf.set("spark.cassandra.connection.port", parsePort(builder.contactPoints));
        if (builder.localDc != null) {
            this.conf.set("connection.local_dc", builder.localDc);
        }
        if (builder.jars != null) {
            this.conf.setJars(builder.jars);
        }
        for (Map.Entry<String, String> entry : builder.sparkProperties.entrySet()) {
            this.conf.set(entry.getKey(), entry.getValue());
        }
        this.logInitializer = builder.logInitializer;
    }

    public void run() {
        long j = this.day * 1000;
        long micros = ((this.day * 1000) + TimeUnit.DAYS.toMicros(1L)) - 1;
        log.info("Running Dependencies job for {}: {} ≤ Span.timestamp {}", new Object[]{this.dateStamp, Long.valueOf(j), Long.valueOf(micros)});
        SparkContext sparkContext = new SparkContext(this.conf);
        try {
            final JavaRDD values = flatMapToLinksByTraceId(CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(this.keyspace, "span"), micros, j, this.inTest).values().mapToPair(dependencyLink -> {
                return Tuple2.apply(Tuple2.apply(dependencyLink.parent(), dependencyLink.child()), dependencyLink);
            }).reduceByKey((dependencyLink2, dependencyLink3) -> {
                return DependencyLink.newBuilder().parent(dependencyLink2.parent()).child(dependencyLink2.child()).callCount(dependencyLink2.callCount() + dependencyLink3.callCount()).errorCount(dependencyLink2.errorCount() + dependencyLink3.errorCount()).build();
            }).values();
            if (values.isEmpty()) {
                log.info("No dependency links could be processed from spans in table {}/span", this.keyspace);
                sparkContext.stop();
            } else {
                log.info("Saving dependency links for {} to {}.dependency", this.dateStamp, this.keyspace);
                CassandraConnector.apply(this.conf).withSessionDo(new AbstractFunction1<Session, Void>() { // from class: zipkin2.dependencies.cassandra3.CassandraDependenciesJob.1
                    public Void apply(Session session) {
                        PreparedStatement prepare = session.prepare(QueryBuilder.insertInto(CassandraDependenciesJob.this.keyspace, "dependency").value("day", QueryBuilder.bindMarker("day")).value("parent", QueryBuilder.bindMarker("parent")).value("child", QueryBuilder.bindMarker("child")).value("calls", QueryBuilder.bindMarker("calls")).value("errors", QueryBuilder.bindMarker("errors")));
                        for (DependencyLink dependencyLink4 : values.collect()) {
                            BoundStatement boundStatement = prepare.bind().setDate("day", LocalDate.fromMillisSinceEpoch(CassandraDependenciesJob.this.day)).setString("parent", dependencyLink4.parent()).setString("child", dependencyLink4.child()).setLong("calls", dependencyLink4.callCount());
                            if (dependencyLink4.errorCount() > 0) {
                                boundStatement.setLong("errors", dependencyLink4.errorCount());
                            }
                            session.execute(boundStatement);
                        }
                        return null;
                    }
                });
                sparkContext.stop();
            }
        } catch (Throwable th) {
            sparkContext.stop();
            throw th;
        }
    }

    JavaPairRDD<String, DependencyLink> flatMapToLinksByTraceId(CassandraTableScanJavaRDD<CassandraRow> cassandraTableScanJavaRDD, long j, long j2, boolean z) {
        return this.strictTraceId ? cassandraTableScanJavaRDD.spanBy(cassandraRow -> {
            return cassandraRow.getString("trace_id");
        }, String.class).flatMapValues(new CassandraRowsToDependencyLinks(this.logInitializer, j2, j, z)) : cassandraTableScanJavaRDD.map(new CassandraRowToSpan(z)).groupBy((v0) -> {
            return v0.traceId();
        }).flatMapValues(new SpansToDependencyLinks(this.logInitializer, j2, j));
    }

    static String getEnv(String str, String str2) {
        String str3 = System.getenv(str);
        return str3 != null ? str3 : str2;
    }

    static String parseHosts(String str) {
        ArrayList arrayList = new ArrayList();
        for (String str2 : str.split(",", -1)) {
            arrayList.add(HostAndPort.fromString(str2).getHostText());
        }
        return Joiner.on(',').join(arrayList);
    }

    static String parsePort(String str) {
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        for (String str2 : str.split(",", -1)) {
            newLinkedHashSet.add(Integer.valueOf(HostAndPort.fromString(str2).getPortOrDefault(9042)));
        }
        return newLinkedHashSet.size() == 1 ? String.valueOf(newLinkedHashSet.iterator().next()) : "9042";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String traceId(CassandraRow cassandraRow) {
        String string = cassandraRow.getString("trace_id");
        if (string.length() > 16) {
            string = string.substring(string.length() - 16);
        }
        return string;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1744935303:
                if (implMethodName.equals("lambda$run$8fe782ea$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1251801661:
                if (implMethodName.equals("lambda$flatMapToLinksByTraceId$673bad6f$1")) {
                    z = false;
                    break;
                }
                break;
            case -1067401920:
                if (implMethodName.equals("traceId")) {
                    z = true;
                    break;
                }
                break;
            case 514787095:
                if (implMethodName.equals("lambda$run$80bff2f2$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("zipkin2/dependencies/cassandra3/CassandraDependenciesJob") && serializedLambda.getImplMethodSignature().equals("(Lcom/datastax/spark/connector/japi/CassandraRow;)Ljava/lang/String;")) {
                    return cassandraRow -> {
                        return cassandraRow.getString("trace_id");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("zipkin2/Span") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.traceId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("zipkin2/dependencies/cassandra3/CassandraDependenciesJob") && serializedLambda.getImplMethodSignature().equals("(Lzipkin2/DependencyLink;Lzipkin2/DependencyLink;)Lzipkin2/DependencyLink;")) {
                    return (dependencyLink2, dependencyLink3) -> {
                        return DependencyLink.newBuilder().parent(dependencyLink2.parent()).child(dependencyLink2.child()).callCount(dependencyLink2.callCount() + dependencyLink3.callCount()).errorCount(dependencyLink2.errorCount() + dependencyLink3.errorCount()).build();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("zipkin2/dependencies/cassandra3/CassandraDependenciesJob") && serializedLambda.getImplMethodSignature().equals("(Lzipkin2/DependencyLink;)Lscala/Tuple2;")) {
                    return dependencyLink -> {
                        return Tuple2.apply(Tuple2.apply(dependencyLink.parent(), dependencyLink.child()), dependencyLink);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
