package org.apache.pekko.stream.connectors.couchbase.impl;

import com.couchbase.client.java.AsyncCluster;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.dispatch.MessageDispatcher;
import org.apache.pekko.event.LogSource$;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.connectors.couchbase.CouchbaseSessionSettings;
import org.apache.pekko.stream.connectors.couchbase.scaladsl.CouchbaseSession$;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.Map;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.runtime.BoxedUnit;

/* compiled from: CouchbaseClusterRegistry.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseClusterRegistry.class */
public final class CouchbaseClusterRegistry {
    private final ActorSystem system;
    private final LoggingAdapter log;
    private final MessageDispatcher blockingDispatcher;
    private final AtomicReference<Map<CouchbaseSessionSettings, Future<AsyncCluster>>> clusters = new AtomicReference<>(Predef$.MODULE$.Map().empty());

    public CouchbaseClusterRegistry(ActorSystem actorSystem) {
        this.system = actorSystem;
        this.log = Logging$.MODULE$.apply(actorSystem, CouchbaseClusterRegistry.class, LogSource$.MODULE$.fromAnyClass());
        this.blockingDispatcher = actorSystem.dispatchers().lookup("pekko.actor.default-blocking-io-dispatcher");
    }

    public Future<AsyncCluster> clusterFor(CouchbaseSessionSettings couchbaseSessionSettings) {
        Some some = this.clusters.get().get(couchbaseSessionSettings);
        return some instanceof Some ? (Future) some.value() : createClusterClient(couchbaseSessionSettings);
    }

    private Future<AsyncCluster> createClusterClient(CouchbaseSessionSettings couchbaseSessionSettings) {
        Promise apply;
        Map<CouchbaseSessionSettings, Future<AsyncCluster>> map;
        do {
            apply = Promise$.MODULE$.apply();
            map = this.clusters.get();
        } while (!this.clusters.compareAndSet(map, map.updated(couchbaseSessionSettings, apply.future())));
        this.log.info("Starting Couchbase client for nodes [{}]", nodesAsString$1(couchbaseSessionSettings));
        apply.completeWith(CouchbaseSession$.MODULE$.createClusterClient(couchbaseSessionSettings, this.blockingDispatcher));
        Future<AsyncCluster> future = apply.future();
        this.system.registerOnTermination(() -> {
            createClusterClient$$anonfun$1(future, couchbaseSessionSettings);
            return BoxedUnit.UNIT;
        });
        return future;
    }

    private static final String nodesAsString$1(CouchbaseSessionSettings couchbaseSessionSettings) {
        return couchbaseSessionSettings.nodes().mkString("\"", "\", \"", "\"");
    }

    private final void createClusterClient$$anonfun$1(Future future, CouchbaseSessionSettings couchbaseSessionSettings) {
        future.foreach(asyncCluster -> {
            this.log.info("Shutting down Couchbase client for nodes [{}]", couchbaseSessionSettings.nodes().mkString("\"", "\", \"", "\""));
            return asyncCluster.disconnect();
        }, this.system.dispatcher());
    }
}
