package com.datastax.oss.driver.internal.core.session;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.InvalidKeyspaceException;
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.pool.ChannelPoolFactory;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.ReplayingEventFilter;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker;
import io.netty.util.concurrent.EventExecutor;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/driver/internal/core/session/DefaultSession.class */
public class DefaultSession implements Session {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSession.class);
    private final InternalDriverContext context;
    private final DriverConfig config;
    private final EventExecutor adminExecutor;
    private final String logPrefix;
    private final SingleThreaded singleThreaded;
    private final RequestProcessorRegistry processorRegistry;
    private volatile CqlIdentifier keyspace;
    private final ConcurrentMap<Node, ChannelPool> pools = new ConcurrentHashMap(16, 0.75f, 1);
    private ConcurrentMap<ByteBuffer, RepreparePayload> repreparePayloads = new MapMaker().weakValues().makeMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/oss/driver/internal/core/session/DefaultSession$SingleThreaded.class */
    public class SingleThreaded {
        private final InternalDriverContext context;
        private final ChannelPoolFactory channelPoolFactory;
        private final CompletableFuture<Session> initFuture;
        private boolean initWasCalled;
        private final CompletableFuture<Void> closeFuture;
        private boolean closeWasCalled;
        private boolean forceCloseWasCalled;
        private final Object distanceListenerKey;
        private final ReplayingEventFilter<DistanceEvent> distanceEventFilter;
        private final Object stateListenerKey;
        private final ReplayingEventFilter<NodeStateEvent> stateEventFilter;
        private final Map<Node, CompletionStage<ChannelPool>> pending;
        private final Map<Node, DistanceEvent> pendingDistanceEvents;
        private final Map<Node, NodeStateEvent> pendingStateEvents;
        static final /* synthetic */ boolean $assertionsDisabled;

        private SingleThreaded(InternalDriverContext internalDriverContext) {
            this.initFuture = new CompletableFuture<>();
            this.closeFuture = new CompletableFuture<>();
            this.distanceEventFilter = new ReplayingEventFilter<>(this::processDistanceEvent);
            this.stateEventFilter = new ReplayingEventFilter<>(this::processStateEvent);
            this.pending = new HashMap();
            this.pendingDistanceEvents = new HashMap();
            this.pendingStateEvents = new HashMap();
            this.context = internalDriverContext;
            this.channelPoolFactory = internalDriverContext.channelPoolFactory();
            this.distanceListenerKey = internalDriverContext.eventBus().register(DistanceEvent.class, RunOrSchedule.on(DefaultSession.this.adminExecutor, this::onDistanceEvent));
            this.stateListenerKey = internalDriverContext.eventBus().register(NodeStateEvent.class, RunOrSchedule.on(DefaultSession.this.adminExecutor, this::onStateEvent));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void init() {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            if (this.initWasCalled) {
                return;
            }
            this.initWasCalled = true;
            DefaultSession.LOG.debug("[{}] Starting initialization", DefaultSession.this.logPrefix);
            this.distanceEventFilter.start();
            this.stateEventFilter.start();
            Collection<Node> values = this.context.metadataManager().getMetadata().getNodes().values();
            ArrayList arrayList = new ArrayList(values.size());
            for (Node node : values) {
                NodeDistance distance = node.getDistance();
                if (distance == NodeDistance.IGNORED) {
                    DefaultSession.LOG.debug("[{}] Skipping {} because it is IGNORED", DefaultSession.this.logPrefix, node);
                } else if (node.getState() == NodeState.FORCED_DOWN) {
                    DefaultSession.LOG.debug("[{}] Skipping {} because it is FORCED_DOWN", DefaultSession.this.logPrefix, node);
                } else {
                    DefaultSession.LOG.debug("[{}] Creating a pool for {}", DefaultSession.this.logPrefix, node);
                    arrayList.add(this.channelPoolFactory.init(node, DefaultSession.this.keyspace, distance, this.context, DefaultSession.this.logPrefix));
                }
            }
            CompletableFutures.whenAllDone(arrayList, () -> {
                onPoolsInit(arrayList);
            }, DefaultSession.this.adminExecutor);
        }

        private void onPoolsInit(List<CompletionStage<ChannelPool>> list) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            DefaultSession.LOG.debug("[{}] All pools have finished initializing", DefaultSession.this.logPrefix);
            boolean z = list.size() > 0;
            Iterator<CompletionStage<ChannelPool>> it = list.iterator();
            while (it.hasNext()) {
                ChannelPool channelPool = (ChannelPool) CompletableFutures.getCompleted(it.next().toCompletableFuture());
                boolean isInvalidKeyspace = channelPool.isInvalidKeyspace();
                if (isInvalidKeyspace) {
                    DefaultSession.LOG.debug("[{}] Pool to {} reports an invalid keyspace", DefaultSession.this.logPrefix, channelPool.getNode());
                }
                z &= isInvalidKeyspace;
                DefaultSession.this.pools.put(channelPool.getNode(), channelPool);
            }
            if (z) {
                this.initFuture.completeExceptionally(new InvalidKeyspaceException("Invalid keyspace " + DefaultSession.this.keyspace.asPrettyCql()));
                forceClose();
            } else {
                DefaultSession.LOG.debug("[{}] Initialization complete, ready", DefaultSession.this.logPrefix);
                this.initFuture.complete(DefaultSession.this);
                this.distanceEventFilter.markReady();
                this.stateEventFilter.markReady();
            }
        }

        private void onDistanceEvent(DistanceEvent distanceEvent) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            this.distanceEventFilter.accept(distanceEvent);
        }

        private void onStateEvent(NodeStateEvent nodeStateEvent) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            this.stateEventFilter.accept(nodeStateEvent);
        }

        private void processDistanceEvent(DistanceEvent distanceEvent) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            DefaultNode defaultNode = distanceEvent.node;
            NodeDistance nodeDistance = distanceEvent.distance;
            if (this.pending.containsKey(defaultNode)) {
                this.pendingDistanceEvents.put(defaultNode, distanceEvent);
                return;
            }
            if (nodeDistance == NodeDistance.IGNORED && DefaultSession.this.pools.containsKey(defaultNode)) {
                ChannelPool channelPool = (ChannelPool) DefaultSession.this.pools.remove(defaultNode);
                if (channelPool != null) {
                    DefaultSession.LOG.debug("[{}] {} became IGNORED, destroying pool", DefaultSession.this.logPrefix, defaultNode);
                    channelPool.closeAsync().exceptionally(th -> {
                        DefaultSession.LOG.warn("[{}] Error closing pool", DefaultSession.this.logPrefix, th);
                        return null;
                    });
                    return;
                }
                return;
            }
            if (defaultNode.getState() == NodeState.FORCED_DOWN) {
                DefaultSession.LOG.warn("[{}] {} became {} but it is FORCED_DOWN, ignoring", new Object[]{DefaultSession.this.logPrefix, defaultNode, nodeDistance});
                return;
            }
            ChannelPool channelPool2 = (ChannelPool) DefaultSession.this.pools.get(defaultNode);
            if (channelPool2 != null) {
                DefaultSession.LOG.debug("[{}] {} became {}, resizing it", new Object[]{DefaultSession.this.logPrefix, defaultNode, nodeDistance});
                channelPool2.resize(nodeDistance);
            } else {
                DefaultSession.LOG.debug("[{}] {} became {} and no pool found, initializing it", new Object[]{DefaultSession.this.logPrefix, defaultNode, nodeDistance});
                CompletionStage<ChannelPool> init = this.channelPoolFactory.init(defaultNode, DefaultSession.this.keyspace, nodeDistance, this.context, DefaultSession.this.logPrefix);
                this.pending.put(defaultNode, init);
                init.thenAcceptAsync(this::onPoolInitialized, DefaultSession.this.adminExecutor).exceptionally(UncaughtExceptions::log);
            }
        }

        private void processStateEvent(NodeStateEvent nodeStateEvent) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            DefaultNode defaultNode = nodeStateEvent.node;
            NodeState nodeState = nodeStateEvent.newState;
            if (this.pending.containsKey(defaultNode)) {
                this.pendingStateEvents.put(defaultNode, nodeStateEvent);
                return;
            }
            if (nodeState == NodeState.FORCED_DOWN) {
                ChannelPool channelPool = (ChannelPool) DefaultSession.this.pools.remove(defaultNode);
                if (channelPool != null) {
                    DefaultSession.LOG.debug("[{}] {} became FORCED_DOWN, destroying pool", DefaultSession.this.logPrefix, defaultNode);
                    channelPool.closeAsync().exceptionally(th -> {
                        DefaultSession.LOG.warn("[{}] Error closing pool", DefaultSession.this.logPrefix, th);
                        return null;
                    });
                    return;
                }
                return;
            }
            if (nodeState == NodeState.UP) {
                ChannelPool channelPool2 = (ChannelPool) DefaultSession.this.pools.get(defaultNode);
                if (channelPool2 != null) {
                    DefaultSession.LOG.debug("[{}] {} came back UP, triggering pool reconnection", DefaultSession.this.logPrefix, defaultNode);
                    channelPool2.reconnectNow();
                } else {
                    DefaultSession.LOG.debug("[{}] {} came back UP and no pool found, initializing it", DefaultSession.this.logPrefix, defaultNode);
                    CompletionStage<ChannelPool> init = this.channelPoolFactory.init(defaultNode, DefaultSession.this.keyspace, defaultNode.getDistance(), this.context, DefaultSession.this.logPrefix);
                    this.pending.put(defaultNode, init);
                    init.thenAcceptAsync(this::onPoolInitialized, DefaultSession.this.adminExecutor).exceptionally(UncaughtExceptions::log);
                }
            }
        }

        private void onPoolInitialized(ChannelPool channelPool) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            Node node = channelPool.getNode();
            if (this.closeWasCalled) {
                DefaultSession.LOG.debug("[{}] Session closed while a pool to {} was initializing, closing it", DefaultSession.this.logPrefix, node);
                channelPool.forceCloseAsync();
                return;
            }
            DefaultSession.LOG.debug("[{}] New pool to {} initialized", DefaultSession.this.logPrefix, node);
            if (Objects.equals(DefaultSession.this.keyspace, channelPool.getInitialKeyspaceName())) {
                reprepareStatements(channelPool);
            } else {
                channelPool.setKeyspace(DefaultSession.this.keyspace).handleAsync((r6, th) -> {
                    if (th != null) {
                        DefaultSession.LOG.warn("Error while switching keyspace to " + DefaultSession.this.keyspace, th);
                    }
                    reprepareStatements(channelPool);
                    return null;
                }, DefaultSession.this.adminExecutor);
            }
        }

        private void reprepareStatements(ChannelPool channelPool) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            if (DefaultSession.this.config.getDefaultProfile().getBoolean(CoreDriverOption.REPREPARE_ENABLED)) {
                new ReprepareOnUp(DefaultSession.this.logPrefix + "|" + channelPool.getNode().getConnectAddress(), channelPool, DefaultSession.this.repreparePayloads, DefaultSession.this.config, () -> {
                    RunOrSchedule.on(DefaultSession.this.adminExecutor, () -> {
                        onPoolReady(channelPool);
                    });
                }).start();
            } else {
                DefaultSession.LOG.debug("[{}] Reprepare on up is disabled, skipping", DefaultSession.this.logPrefix);
                onPoolReady(channelPool);
            }
        }

        private void onPoolReady(ChannelPool channelPool) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            Node node = channelPool.getNode();
            this.pending.remove(node);
            DefaultSession.this.pools.put(node, channelPool);
            DistanceEvent remove = this.pendingDistanceEvents.remove(node);
            NodeStateEvent remove2 = this.pendingStateEvents.remove(node);
            if (remove2 != null && remove2.newState == NodeState.FORCED_DOWN) {
                DefaultSession.LOG.debug("[{}] Received {} while the pool was initializing, processing it now", DefaultSession.this.logPrefix, remove2);
                processStateEvent(remove2);
            } else if (remove != null) {
                DefaultSession.LOG.debug("[{}] Received {} while the pool was initializing, processing it now", DefaultSession.this.logPrefix, remove);
                processDistanceEvent(remove);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setKeyspace(CqlIdentifier cqlIdentifier) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            if (this.closeWasCalled) {
                return;
            }
            DefaultSession.LOG.debug("[{}] Switching to keyspace {}", DefaultSession.this.logPrefix, cqlIdentifier);
            Iterator it = DefaultSession.this.pools.values().iterator();
            while (it.hasNext()) {
                ((ChannelPool) it.next()).setKeyspace(cqlIdentifier);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            if (this.closeWasCalled) {
                return;
            }
            this.closeWasCalled = true;
            DefaultSession.LOG.debug("[{}] Starting shutdown", DefaultSession.this.logPrefix);
            this.context.eventBus().unregister(this.distanceListenerKey, DistanceEvent.class);
            this.context.eventBus().unregister(this.stateListenerKey, NodeStateEvent.class);
            ArrayList arrayList = new ArrayList(DefaultSession.this.pools.size());
            Iterator it = DefaultSession.this.pools.values().iterator();
            while (it.hasNext()) {
                arrayList.add(((ChannelPool) it.next()).closeAsync());
            }
            CompletableFutures.whenAllDone(arrayList, () -> {
                onAllPoolsClosed(arrayList);
            }, DefaultSession.this.adminExecutor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void forceClose() {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            if (this.forceCloseWasCalled) {
                return;
            }
            this.forceCloseWasCalled = true;
            DefaultSession.LOG.debug("[{}] Starting forced shutdown (was {}closed before)", DefaultSession.this.logPrefix, this.closeWasCalled ? "" : "not ");
            if (this.closeWasCalled) {
                Iterator it = DefaultSession.this.pools.values().iterator();
                while (it.hasNext()) {
                    ((ChannelPool) it.next()).forceCloseAsync();
                }
            } else {
                ArrayList arrayList = new ArrayList(DefaultSession.this.pools.size());
                Iterator it2 = DefaultSession.this.pools.values().iterator();
                while (it2.hasNext()) {
                    arrayList.add(((ChannelPool) it2.next()).forceCloseAsync());
                }
                CompletableFutures.whenAllDone(arrayList, () -> {
                    onAllPoolsClosed(arrayList);
                }, DefaultSession.this.adminExecutor);
            }
        }

        private void onAllPoolsClosed(List<CompletionStage<Void>> list) {
            if (!$assertionsDisabled && !DefaultSession.this.adminExecutor.inEventLoop()) {
                throw new AssertionError();
            }
            Throwable th = null;
            Iterator<CompletionStage<Void>> it = list.iterator();
            while (it.hasNext()) {
                CompletableFuture<Void> completableFuture = it.next().toCompletableFuture();
                if (!$assertionsDisabled && !completableFuture.isDone()) {
                    throw new AssertionError();
                }
                if (completableFuture.isCompletedExceptionally()) {
                    Throwable failed = CompletableFutures.getFailed(completableFuture);
                    if (th == null) {
                        th = failed;
                    } else {
                        th.addSuppressed(failed);
                    }
                }
            }
            if (th != null) {
                this.closeFuture.completeExceptionally(th);
            } else {
                DefaultSession.LOG.debug("[{}] Shutdown complete", DefaultSession.this.logPrefix);
                this.closeFuture.complete(null);
            }
        }

        static {
            $assertionsDisabled = !DefaultSession.class.desiredAssertionStatus();
        }
    }

    public static CompletionStage<Session> init(InternalDriverContext internalDriverContext, CqlIdentifier cqlIdentifier, String str) {
        return new DefaultSession(internalDriverContext, cqlIdentifier, str).init();
    }

    private DefaultSession(InternalDriverContext internalDriverContext, CqlIdentifier cqlIdentifier, String str) {
        this.adminExecutor = internalDriverContext.nettyOptions().adminEventExecutorGroup().next();
        this.context = internalDriverContext;
        this.config = internalDriverContext.config();
        this.singleThreaded = new SingleThreaded(internalDriverContext);
        this.processorRegistry = internalDriverContext.requestProcessorRegistry();
        this.keyspace = cqlIdentifier;
        this.logPrefix = str;
    }

    private CompletionStage<Session> init() {
        EventExecutor eventExecutor = this.adminExecutor;
        SingleThreaded singleThreaded = this.singleThreaded;
        singleThreaded.getClass();
        RunOrSchedule.on(eventExecutor, () -> {
            singleThreaded.init();
        });
        return this.singleThreaded.initFuture;
    }

    @Override // com.datastax.oss.driver.api.core.session.Session
    public CqlIdentifier getKeyspace() {
        return this.keyspace;
    }

    public void setKeyspace(CqlIdentifier cqlIdentifier) {
        CqlIdentifier cqlIdentifier2 = this.keyspace;
        if (Objects.equals(cqlIdentifier2, cqlIdentifier)) {
            return;
        }
        if (this.config.getDefaultProfile().getBoolean(CoreDriverOption.REQUEST_WARN_IF_SET_KEYSPACE)) {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = this.logPrefix;
            objArr[1] = cqlIdentifier2 == null ? "<none>" : cqlIdentifier2.asInternal();
            objArr[2] = cqlIdentifier.asInternal();
            objArr[3] = CoreDriverOption.REQUEST_WARN_IF_SET_KEYSPACE.getPath();
            logger.warn("[{}] Detected a keyspace change at runtime ({} => {}). This is an anti-pattern that should be avoided in production (see '{}' in the configuration).", objArr);
        }
        this.keyspace = cqlIdentifier;
        RunOrSchedule.on(this.adminExecutor, () -> {
            this.singleThreaded.setKeyspace(cqlIdentifier);
        });
    }

    public Map<Node, ChannelPool> getPools() {
        return this.pools;
    }

    @Override // com.datastax.oss.driver.api.core.session.Session
    public <SyncResultT, AsyncResultT> SyncResultT execute(Request<SyncResultT, AsyncResultT> request) {
        return newHandler(request).syncResult();
    }

    @Override // com.datastax.oss.driver.api.core.session.Session
    public <SyncResultT, AsyncResultT> AsyncResultT executeAsync(Request<SyncResultT, AsyncResultT> request) {
        return newHandler(request).asyncResult();
    }

    private <SyncResultT, AsyncResultT> RequestHandler<SyncResultT, AsyncResultT> newHandler(Request<SyncResultT, AsyncResultT> request) {
        if (request.getKeyspace() != null) {
            throw new UnsupportedOperationException("Per-request keyspaces are not supported yet");
        }
        return this.processorRegistry.processorFor(request).newHandler(request, this, this.context, this.logPrefix);
    }

    public ConcurrentMap<ByteBuffer, RepreparePayload> getRepreparePayloads() {
        return this.repreparePayloads;
    }

    @Override // com.datastax.oss.driver.api.core.AsyncAutoCloseable
    public CompletionStage<Void> closeFuture() {
        return this.singleThreaded.closeFuture;
    }

    @Override // com.datastax.oss.driver.api.core.AsyncAutoCloseable
    public CompletionStage<Void> closeAsync() {
        EventExecutor eventExecutor = this.adminExecutor;
        SingleThreaded singleThreaded = this.singleThreaded;
        singleThreaded.getClass();
        RunOrSchedule.on(eventExecutor, () -> {
            singleThreaded.close();
        });
        return this.singleThreaded.closeFuture;
    }

    @Override // com.datastax.oss.driver.api.core.AsyncAutoCloseable
    public CompletionStage<Void> forceCloseAsync() {
        EventExecutor eventExecutor = this.adminExecutor;
        SingleThreaded singleThreaded = this.singleThreaded;
        singleThreaded.getClass();
        RunOrSchedule.on(eventExecutor, () -> {
            singleThreaded.forceClose();
        });
        return this.singleThreaded.closeFuture;
    }
}
