/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.mongodb.shaded.com.mongodb.internal.connection;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoClientException;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoIncompatibleDriverException;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoInterruptedException;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoTimeoutException;
import org.apache.flink.mongodb.shaded.com.mongodb.ServerAddress;
import org.apache.flink.mongodb.shaded.com.mongodb.assertions.Assertions;
import org.apache.flink.mongodb.shaded.com.mongodb.connection.ClusterDescription;
import org.apache.flink.mongodb.shaded.com.mongodb.connection.ClusterId;
import org.apache.flink.mongodb.shaded.com.mongodb.connection.ClusterSettings;
import org.apache.flink.mongodb.shaded.com.mongodb.connection.ClusterType;
import org.apache.flink.mongodb.shaded.com.mongodb.connection.ServerDescription;
import org.apache.flink.mongodb.shaded.com.mongodb.diagnostics.logging.Logger;
import org.apache.flink.mongodb.shaded.com.mongodb.diagnostics.logging.Loggers;
import org.apache.flink.mongodb.shaded.com.mongodb.event.ClusterClosedEvent;
import org.apache.flink.mongodb.shaded.com.mongodb.event.ClusterDescriptionChangedEvent;
import org.apache.flink.mongodb.shaded.com.mongodb.event.ClusterListener;
import org.apache.flink.mongodb.shaded.com.mongodb.event.ClusterOpeningEvent;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.async.SingleResultCallback;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.Cluster;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.ClusterClock;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.ClusterableServer;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.ClusterableServerFactory;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.EventHelper;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.Server;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.connection.ServerTuple;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.event.EventListenerHelper;
import org.apache.flink.mongodb.shaded.com.mongodb.internal.selector.LatencyMinimizingServerSelector;
import org.apache.flink.mongodb.shaded.com.mongodb.lang.Nullable;
import org.apache.flink.mongodb.shaded.com.mongodb.selector.CompositeServerSelector;
import org.apache.flink.mongodb.shaded.com.mongodb.selector.ServerSelector;

abstract class BaseCluster
implements Cluster {
    private static final Logger LOGGER = Loggers.getLogger("cluster");
    private final AtomicReference<CountDownLatch> phase = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
    private final ClusterableServerFactory serverFactory;
    private final ClusterId clusterId;
    private final ClusterSettings settings;
    private final ClusterListener clusterListener;
    private final Deque<ServerSelectionRequest> waitQueue = new ConcurrentLinkedDeque<ServerSelectionRequest>();
    private final ClusterClock clusterClock = new ClusterClock();
    private Thread waitQueueHandler;
    private volatile boolean isClosed;
    private volatile ClusterDescription description;

    BaseCluster(ClusterId clusterId, ClusterSettings settings, ClusterableServerFactory serverFactory) {
        this.clusterId = Assertions.notNull("clusterId", clusterId);
        this.settings = Assertions.notNull("settings", settings);
        this.serverFactory = Assertions.notNull("serverFactory", serverFactory);
        this.clusterListener = EventListenerHelper.singleClusterListener(settings);
        this.clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
        this.description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(), settings, serverFactory.getSettings());
    }

    @Override
    public ClusterClock getClock() {
        return this.clusterClock;
    }

    @Override
    public ServerTuple selectServer(ServerSelector serverSelector) {
        Assertions.isTrue("open", !this.isClosed());
        try {
            long startTimeNanos;
            CountDownLatch currentPhase = this.phase.get();
            ClusterDescription curDescription = this.description;
            ServerSelector compositeServerSelector = this.getCompositeServerSelector(serverSelector);
            ServerTuple serverTuple = this.selectServer(compositeServerSelector, curDescription);
            boolean selectionFailureLogged = false;
            long curTimeNanos = startTimeNanos = System.nanoTime();
            long maxWaitTimeNanos = this.getMaxWaitTimeNanos();
            while (true) {
                this.throwIfIncompatible(curDescription);
                if (serverTuple != null) {
                    return serverTuple;
                }
                if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {
                    throw this.createTimeoutException(serverSelector, curDescription);
                }
                if (!selectionFailureLogged) {
                    this.logServerSelectionFailure(serverSelector, curDescription);
                    selectionFailureLogged = true;
                }
                this.connect();
                currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), this.getMinWaitTimeNanos()), TimeUnit.NANOSECONDS);
                curTimeNanos = System.nanoTime();
                currentPhase = this.phase.get();
                curDescription = this.description;
                serverTuple = this.selectServer(compositeServerSelector, curDescription);
            }
        }
        catch (InterruptedException e) {
            throw new MongoInterruptedException(String.format("Interrupted while waiting for a server that matches %s", serverSelector), e);
        }
    }

    @Override
    public void selectServerAsync(ServerSelector serverSelector, SingleResultCallback<ServerTuple> callback) {
        ClusterDescription currentDescription;
        CountDownLatch currentPhase;
        ServerSelectionRequest request;
        Assertions.isTrue("open", !this.isClosed());
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace(String.format("Asynchronously selecting server with selector %s", serverSelector));
        }
        if (!this.handleServerSelectionRequest(request = new ServerSelectionRequest(serverSelector, this.getCompositeServerSelector(serverSelector), this.getMaxWaitTimeNanos(), callback), currentPhase = this.phase.get(), currentDescription = this.description)) {
            this.notifyWaitQueueHandler(request);
        }
    }

    @Override
    public ClusterDescription getDescription() {
        Assertions.isTrue("open", !this.isClosed());
        try {
            long startTimeNanos;
            CountDownLatch currentPhase = this.phase.get();
            ClusterDescription curDescription = this.description;
            boolean selectionFailureLogged = false;
            long curTimeNanos = startTimeNanos = System.nanoTime();
            long maxWaitTimeNanos = this.getMaxWaitTimeNanos();
            while (curDescription.getType() == ClusterType.UNKNOWN) {
                if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {
                    throw new MongoTimeoutException(String.format("Timed out after %d ms while waiting to connect. Client view of cluster state is %s", this.settings.getServerSelectionTimeout(TimeUnit.MILLISECONDS), curDescription.getShortDescription()));
                }
                if (!selectionFailureLogged) {
                    if (LOGGER.isInfoEnabled()) {
                        if (this.settings.getServerSelectionTimeout(TimeUnit.MILLISECONDS) < 0L) {
                            LOGGER.info("Cluster description not yet available. Waiting indefinitely.");
                        } else {
                            LOGGER.info(String.format("Cluster description not yet available. Waiting for %d ms before timing out", this.settings.getServerSelectionTimeout(TimeUnit.MILLISECONDS)));
                        }
                    }
                    selectionFailureLogged = true;
                }
                this.connect();
                currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), this.getMinWaitTimeNanos()), TimeUnit.NANOSECONDS);
                curTimeNanos = System.nanoTime();
                currentPhase = this.phase.get();
                curDescription = this.description;
            }
            return curDescription;
        }
        catch (InterruptedException e) {
            throw new MongoInterruptedException("Interrupted while waiting to connect", e);
        }
    }

    @Override
    public ClusterId getClusterId() {
        return this.clusterId;
    }

    @Override
    public ClusterSettings getSettings() {
        return this.settings;
    }

    public ClusterableServerFactory getServerFactory() {
        return this.serverFactory;
    }

    protected abstract void connect();

    @Override
    public void close() {
        if (!this.isClosed()) {
            this.isClosed = true;
            this.phase.get().countDown();
            this.clusterListener.clusterClosed(new ClusterClosedEvent(this.clusterId));
            this.stopWaitQueueHandler();
        }
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    protected void updateDescription(ClusterDescription newDescription) {
        this.withLock(() -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Updating cluster description to  %s", newDescription.getShortDescription()));
            }
            this.description = newDescription;
            this.updatePhase();
        });
    }

    protected void fireChangeEvent(ClusterDescription newDescription, ClusterDescription previousDescription) {
        if (!EventHelper.wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) {
            this.clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(this.getClusterId(), newDescription, previousDescription));
        }
    }

    @Override
    public ClusterDescription getCurrentDescription() {
        return this.description;
    }

    @Override
    public synchronized void withLock(Runnable action) {
        action.run();
    }

    private void updatePhase() {
        this.withLock(() -> this.phase.getAndSet(new CountDownLatch(1)).countDown());
    }

    private long getMaxWaitTimeNanos() {
        if (this.settings.getServerSelectionTimeout(TimeUnit.NANOSECONDS) < 0L) {
            return Long.MAX_VALUE;
        }
        return this.settings.getServerSelectionTimeout(TimeUnit.NANOSECONDS);
    }

    private long getMinWaitTimeNanos() {
        return this.serverFactory.getSettings().getMinHeartbeatFrequency(TimeUnit.NANOSECONDS);
    }

    private boolean handleServerSelectionRequest(ServerSelectionRequest request, CountDownLatch currentPhase, ClusterDescription description) {
        try {
            if (currentPhase != request.phase) {
                CountDownLatch prevPhase = request.phase;
                request.phase = currentPhase;
                if (!description.isCompatibleWithDriver()) {
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("Asynchronously failed server selection due to driver incompatibility with server");
                    }
                    request.onResult(null, this.createIncompatibleException(description));
                    return true;
                }
                ServerTuple serverTuple = this.selectServer(request.compositeSelector, description);
                if (serverTuple != null) {
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace(String.format("Asynchronously selected server %s", serverTuple.getServerDescription().getAddress()));
                    }
                    request.onResult(serverTuple, null);
                    return true;
                }
                if (prevPhase == null) {
                    this.logServerSelectionFailure(request.originalSelector, description);
                }
            }
            if (request.timedOut()) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Asynchronously failed server selection after timeout");
                }
                request.onResult(null, this.createTimeoutException(request.originalSelector, description));
                return true;
            }
            return false;
        }
        catch (Exception e) {
            request.onResult(null, e);
            return true;
        }
    }

    private void logServerSelectionFailure(ServerSelector serverSelector, ClusterDescription curDescription) {
        if (LOGGER.isInfoEnabled()) {
            if (this.settings.getServerSelectionTimeout(TimeUnit.MILLISECONDS) < 0L) {
                LOGGER.info(String.format("No server chosen by %s from cluster description %s. Waiting indefinitely.", serverSelector, curDescription));
            } else {
                LOGGER.info(String.format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out", serverSelector, curDescription, this.settings.getServerSelectionTimeout(TimeUnit.MILLISECONDS)));
            }
        }
    }

    @Nullable
    private ServerTuple selectServer(ServerSelector serverSelector, ClusterDescription clusterDescription) {
        return BaseCluster.selectServer(serverSelector, clusterDescription, this::getServer);
    }

    @Nullable
    static ServerTuple selectServer(ServerSelector serverSelector, ClusterDescription clusterDescription, Function<ServerAddress, Server> serverCatalog) {
        return BaseCluster.atMostNRandom(new ArrayList<ServerDescription>(serverSelector.select(clusterDescription)), 2, serverDescription -> {
            Server server = (Server)serverCatalog.apply(serverDescription.getAddress());
            return server == null ? null : new ServerTuple(server, (ServerDescription)serverDescription);
        }).stream().min(Comparator.comparingInt(serverTuple -> serverTuple.getServer().operationCount())).orElse(null);
    }

    private static List<ServerTuple> atMostNRandom(ArrayList<ServerDescription> list, int n, Function<ServerDescription, ServerTuple> transformer) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        ArrayList<ServerTuple> result = new ArrayList<ServerTuple>(n);
        for (int i = list.size() - 1; i >= 0 && result.size() < n; --i) {
            Collections.swap(list, i, random.nextInt(i + 1));
            ServerTuple serverTuple = transformer.apply(list.get(i));
            if (serverTuple == null) continue;
            result.add(serverTuple);
        }
        return result;
    }

    private ServerSelector getCompositeServerSelector(ServerSelector serverSelector) {
        LatencyMinimizingServerSelector latencyMinimizingServerSelector = new LatencyMinimizingServerSelector(this.settings.getLocalThreshold(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        if (this.settings.getServerSelector() == null) {
            return new CompositeServerSelector(Arrays.asList(serverSelector, latencyMinimizingServerSelector));
        }
        return new CompositeServerSelector(Arrays.asList(serverSelector, this.settings.getServerSelector(), latencyMinimizingServerSelector));
    }

    protected ClusterableServer createServer(ServerAddress serverAddress) {
        return this.serverFactory.create(this, serverAddress);
    }

    private void throwIfIncompatible(ClusterDescription curDescription) {
        if (!curDescription.isCompatibleWithDriver()) {
            throw this.createIncompatibleException(curDescription);
        }
    }

    private MongoIncompatibleDriverException createIncompatibleException(ClusterDescription curDescription) {
        String message;
        ServerDescription incompatibleServer = curDescription.findServerIncompatiblyOlderThanDriver();
        if (incompatibleServer != null) {
            message = String.format("Server at %s reports wire version %d, but this version of the driver requires at least %d (MongoDB %s).", incompatibleServer.getAddress(), incompatibleServer.getMaxWireVersion(), 2, "2.6");
        } else {
            incompatibleServer = curDescription.findServerIncompatiblyNewerThanDriver();
            if (incompatibleServer != null) {
                message = String.format("Server at %s requires wire version %d, but this version of the driver only supports up to %d.", incompatibleServer.getAddress(), incompatibleServer.getMinWireVersion(), 17);
            } else {
                throw new IllegalStateException("Server can't be both older than the driver and newer.");
            }
        }
        return new MongoIncompatibleDriverException(message, curDescription);
    }

    private MongoTimeoutException createTimeoutException(ServerSelector serverSelector, ClusterDescription curDescription) {
        return new MongoTimeoutException(String.format("Timed out after %d ms while waiting for a server that matches %s. Client view of cluster state is %s", this.settings.getServerSelectionTimeout(TimeUnit.MILLISECONDS), serverSelector, curDescription.getShortDescription()));
    }

    private void notifyWaitQueueHandler(ServerSelectionRequest request) {
        this.withLock(() -> {
            if (this.isClosed) {
                return;
            }
            this.waitQueue.add(request);
            if (this.waitQueueHandler == null) {
                this.waitQueueHandler = new Thread((Runnable)new WaitQueueHandler(), "cluster-" + this.clusterId.getValue());
                this.waitQueueHandler.setDaemon(true);
                this.waitQueueHandler.start();
            } else {
                this.updatePhase();
            }
        });
    }

    private void stopWaitQueueHandler() {
        this.withLock(() -> {
            if (this.waitQueueHandler != null) {
                this.waitQueueHandler.interrupt();
            }
        });
    }

    private static final class ServerSelectionRequest {
        private final ServerSelector originalSelector;
        private final ServerSelector compositeSelector;
        private final long maxWaitTimeNanos;
        private final SingleResultCallback<ServerTuple> callback;
        private final long startTimeNanos = System.nanoTime();
        private CountDownLatch phase;

        ServerSelectionRequest(ServerSelector serverSelector, ServerSelector compositeSelector, long maxWaitTimeNanos, SingleResultCallback<ServerTuple> callback) {
            this.originalSelector = serverSelector;
            this.compositeSelector = compositeSelector;
            this.maxWaitTimeNanos = maxWaitTimeNanos;
            this.callback = callback;
        }

        void onResult(ServerTuple serverTuple, Throwable t) {
            try {
                this.callback.onResult(serverTuple, t);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }

        boolean timedOut() {
            return System.nanoTime() - this.startTimeNanos > this.maxWaitTimeNanos;
        }

        long getRemainingTime() {
            return this.startTimeNanos + this.maxWaitTimeNanos - System.nanoTime();
        }
    }

    private final class WaitQueueHandler
    implements Runnable {
        private WaitQueueHandler() {
        }

        @Override
        public void run() {
            while (!BaseCluster.this.isClosed) {
                CountDownLatch currentPhase = (CountDownLatch)BaseCluster.this.phase.get();
                ClusterDescription curDescription = BaseCluster.this.description;
                long waitTimeNanos = Long.MAX_VALUE;
                Iterator iter = BaseCluster.this.waitQueue.iterator();
                while (iter.hasNext()) {
                    ServerSelectionRequest nextRequest = (ServerSelectionRequest)iter.next();
                    if (BaseCluster.this.handleServerSelectionRequest(nextRequest, currentPhase, curDescription)) {
                        iter.remove();
                        continue;
                    }
                    waitTimeNanos = Math.min(nextRequest.getRemainingTime(), Math.min(BaseCluster.this.getMinWaitTimeNanos(), waitTimeNanos));
                }
                if (waitTimeNanos < Long.MAX_VALUE) {
                    BaseCluster.this.connect();
                }
                try {
                    currentPhase.await(waitTimeNanos, TimeUnit.NANOSECONDS);
                }
                catch (InterruptedException interruptedException) {}
            }
            Iterator iter = BaseCluster.this.waitQueue.iterator();
            while (iter.hasNext()) {
                ((ServerSelectionRequest)iter.next()).onResult(null, new MongoClientException("Shutdown in progress"));
                iter.remove();
            }
        }
    }
}

