package akka.stream.alpakka.cassandra;

import akka.ConfigurationException;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.discovery.Discovery$;
import akka.util.JavaDurationConverters$;
import akka.util.JavaDurationConverters$JavaDurationOps$;
import com.datastax.oss.driver.api.core.CqlSession;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Seq$;
import scala.compat.java8.FutureConverters$;
import scala.compat.java8.FutureConverters$CompletionStageOps$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* compiled from: AkkaDiscoverySessionProvider.scala */
/* loaded from: input_file:akka/stream/alpakka/cassandra/AkkaDiscoverySessionProvider$.class */
public final class AkkaDiscoverySessionProvider$ {
    public static AkkaDiscoverySessionProvider$ MODULE$;

    static {
        new AkkaDiscoverySessionProvider$();
    }

    public Future<CqlSession> connect(ActorSystem actorSystem, Config config, ExecutionContext executionContext) {
        return readNodes(config, actorSystem, executionContext).flatMap(seq -> {
            return FutureConverters$CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps(CqlSession.builder().withConfigLoader(DriverConfigLoaderFromConfig$.MODULE$.fromConfig(ConfigFactory.parseString(new StringBuilder(43).append("\n        basic.contact-points = [").append(seq.mkString("\"", "\", \"", "\"")).append("]\n        ").toString()).withFallback(CqlSessionProvider$.MODULE$.driverConfig(actorSystem, config)))).buildAsync()));
        }, executionContext);
    }

    public Future<CqlSession> connect(ClassicActorSystemProvider classicActorSystemProvider, Config config, ExecutionContext executionContext) {
        return connect(classicActorSystemProvider.classicSystem(), config, executionContext);
    }

    private Future<Seq<String>> readNodes(Config config, ActorSystem actorSystem, ExecutionContext executionContext) {
        Config config2 = config.getConfig("service-discovery");
        return readNodes(config2.getString("name"), JavaDurationConverters$JavaDurationOps$.MODULE$.asScala$extension(JavaDurationConverters$.MODULE$.JavaDurationOps(config2.getDuration("lookup-timeout"))), actorSystem, executionContext);
    }

    private Future<Seq<String>> readNodes(String str, FiniteDuration finiteDuration, ActorSystem actorSystem, ExecutionContext executionContext) {
        return Discovery$.MODULE$.apply(actorSystem).discovery().lookup(str, finiteDuration).map(resolved -> {
            return (Seq) resolved.addresses().map(resolvedTarget -> {
                return new StringBuilder(1).append(resolvedTarget.host()).append(":").append(resolvedTarget.port().getOrElse(() -> {
                    throw new ConfigurationException(new StringBuilder(66).append("Akka Discovery for Cassandra service [").append(str).append("] must provide a port for [").append(resolvedTarget.host()).append("]").toString());
                })).toString();
            }, Seq$.MODULE$.canBuildFrom());
        }, executionContext);
    }

    private AkkaDiscoverySessionProvider$() {
        MODULE$ = this;
    }
}
