/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.io.netty.channel.Channel;
import net.nmoncho.shaded.io.netty.channel.ChannelFuture;
import net.nmoncho.shaded.io.netty.channel.ChannelHandler;
import net.nmoncho.shaded.io.netty.channel.ChannelHandlerContext;
import net.nmoncho.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import net.nmoncho.shaded.io.netty.channel.EventLoop;
import net.nmoncho.shaded.io.netty.channel.unix.Errors;
import net.nmoncho.shaded.io.netty.util.concurrent.EventExecutor;
import net.nmoncho.shaded.io.netty.util.concurrent.Future;
import net.nmoncho.shaded.io.netty.util.concurrent.GenericFutureListener;
import net.nmoncho.shaded.io.netty.util.concurrent.Promise;
import net.nmoncho.shaded.io.netty.util.concurrent.PromiseNotifier;
import net.nmoncho.shaded.io.netty.util.concurrent.SucceededFuture;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.net.AsyncChannelOutputPlus;
import org.apache.cassandra.net.AsyncChannelPromise;
import org.apache.cassandra.net.AsyncMessageOutputPlus;
import org.apache.cassandra.net.ConnectionCategory;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.net.InvalidSerializedSizeException;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.OutboundConnectionInitiator;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.net.OutboundConnections;
import org.apache.cassandra.net.OutboundDebugCallbacks;
import org.apache.cassandra.net.OutboundMessageCallbacks;
import org.apache.cassandra.net.OutboundMessageQueue;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.SocketFactory;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutboundConnection {
    static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class);
    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 30L, TimeUnit.SECONDS);
    private static final AtomicLongFieldUpdater<OutboundConnection> submittedUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "submittedCount");
    private static final AtomicLongFieldUpdater<OutboundConnection> pendingCountAndBytesUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "pendingCountAndBytes");
    private static final AtomicLongFieldUpdater<OutboundConnection> overloadedCountUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "overloadedCount");
    private static final AtomicLongFieldUpdater<OutboundConnection> overloadedBytesUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "overloadedBytes");
    private static final AtomicReferenceFieldUpdater<OutboundConnection, Future> closingUpdater = AtomicReferenceFieldUpdater.newUpdater(OutboundConnection.class, Future.class, "closing");
    private static final AtomicReferenceFieldUpdater<OutboundConnection, Future> scheduledCloseUpdater = AtomicReferenceFieldUpdater.newUpdater(OutboundConnection.class, Future.class, "scheduledClose");
    private final EventLoop eventLoop;
    private final Delivery delivery;
    private final OutboundMessageCallbacks callbacks;
    private final OutboundDebugCallbacks debug;
    @VisibleForTesting
    final OutboundMessageQueue queue;
    private final long pendingCapacityInBytes;
    private volatile long pendingCountAndBytes = 0L;
    private final ResourceLimits.EndpointAndGlobal reserveCapacityInBytes;
    private final Object readablePendingBytes = new Object(){

        public String toString() {
            return FBUtilities.prettyPrintMemory(OutboundConnection.this.pendingBytes());
        }
    };
    private final Object readableReserveEndpointUsing = new Object(){

        public String toString() {
            return FBUtilities.prettyPrintMemory(((OutboundConnection)OutboundConnection.this).reserveCapacityInBytes.endpoint.using());
        }
    };
    private final Object readableReserveGlobalUsing = new Object(){

        public String toString() {
            return FBUtilities.prettyPrintMemory(((OutboundConnection)OutboundConnection.this).reserveCapacityInBytes.global.using());
        }
    };
    private volatile long submittedCount = 0L;
    private volatile long overloadedCount = 0L;
    private volatile long overloadedBytes = 0L;
    private long expiredCount = 0L;
    private long expiredBytes = 0L;
    private long errorCount = 0L;
    private long errorBytes = 0L;
    private long sentCount;
    private long sentBytes;
    private long successfulConnections;
    private long connectionAttempts;
    private static final int pendingByteBits = 42;
    private final ConnectionType type;
    private OutboundConnectionSettings template;
    private volatile State state;
    private volatile Future<Void> closing;
    private volatile Future<Void> scheduledClose;

    private static boolean isMaxPendingCount(long pendingCountAndBytes) {
        return (pendingCountAndBytes & 0xFFFFFC0000000000L) == -4398046511104L;
    }

    private static int pendingCount(long pendingCountAndBytes) {
        return (int)(pendingCountAndBytes >>> 42);
    }

    private static long pendingBytes(long pendingCountAndBytes) {
        return pendingCountAndBytes & 0x3FFFFFFFFFFL;
    }

    private static long pendingCountAndBytes(long pendingCount, long pendingBytes) {
        return pendingCount << 42 | pendingBytes;
    }

    OutboundConnection(ConnectionType type, OutboundConnectionSettings settings, ResourceLimits.EndpointAndGlobal reserveCapacityInBytes) {
        this.template = settings.withDefaults(ConnectionCategory.MESSAGING);
        this.type = type;
        this.eventLoop = this.template.socketFactory.defaultGroup().next();
        this.pendingCapacityInBytes = this.template.applicationSendQueueCapacityInBytes.intValue();
        this.reserveCapacityInBytes = reserveCapacityInBytes;
        this.callbacks = this.template.callbacks;
        this.debug = this.template.debug;
        this.queue = new OutboundMessageQueue(MonotonicClock.Global.approxTime, this::onExpired);
        this.delivery = type == ConnectionType.LARGE_MESSAGES ? new LargeMessageDelivery(this.template.socketFactory.synchronousWorkExecutor) : new EventLoopDelivery();
        this.setDisconnected();
    }

    public void enqueue(Message message) throws ClosedChannelException {
        if (this.isClosing()) {
            throw new ClosedChannelException();
        }
        int canonicalSize = this.canonicalSize(message);
        if (canonicalSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes()) {
            throw new Message.OversizedMessageException(canonicalSize);
        }
        submittedUpdater.incrementAndGet(this);
        switch (this.acquireCapacity(canonicalSize)) {
            case INSUFFICIENT_ENDPOINT: {
                if (this.queue.maybePruneExpired() && ResourceLimits.Outcome.SUCCESS == this.acquireCapacity(canonicalSize)) break;
            }
            case INSUFFICIENT_GLOBAL: {
                this.onOverloaded(message);
                return;
            }
        }
        this.queue.add(message);
        this.delivery.execute();
        if (this.isClosing() && this.queue.remove(message)) {
            this.releaseCapacity(1L, canonicalSize);
            throw new ClosedChannelException();
        }
    }

    private ResourceLimits.Outcome acquireCapacity(long bytes) {
        return this.acquireCapacity(1L, bytes);
    }

    /*
     * Unable to fully structure code
     */
    private ResourceLimits.Outcome acquireCapacity(long count, long bytes) {
        increment = OutboundConnection.pendingCountAndBytes(count, bytes);
        unusedClaimedReserve = 0L;
        outcome = null;
        block4: while (true) lbl-1000:
        // 3 sources

        {
            block8: {
                if (OutboundConnection.isMaxPendingCount(current = this.pendingCountAndBytes)) {
                    outcome = ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT;
                    break;
                }
                next = current + increment;
                if (OutboundConnection.pendingBytes(next) > this.pendingCapacityInBytes) break block8;
                if (!OutboundConnection.pendingCountAndBytesUpdater.compareAndSet(this, current, next)) ** GOTO lbl-1000
                outcome = ResourceLimits.Outcome.SUCCESS;
                break;
            }
            state = this.state;
            if (state.isConnecting() && state.connecting().isFailingToConnect) {
                outcome = ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT;
                break;
            }
            requiredReserve = Math.min(bytes, OutboundConnection.pendingBytes(next) - this.pendingCapacityInBytes);
            if (unusedClaimedReserve >= requiredReserve) continue;
            extraGlobalReserve = requiredReserve - unusedClaimedReserve;
            outcome = this.reserveCapacityInBytes.tryAllocate(extraGlobalReserve);
            switch (4.$SwitchMap$org$apache$cassandra$net$ResourceLimits$Outcome[outcome.ordinal()]) {
                case 1: 
                case 2: {
                    break block4;
                }
                case 3: {
                    unusedClaimedReserve += extraGlobalReserve;
                }
                default: {
                    if (!OutboundConnection.pendingCountAndBytesUpdater.compareAndSet(this, current, next)) continue block4;
                    unusedClaimedReserve -= requiredReserve;
                    break block4;
                }
            }
            break;
        }
        if (unusedClaimedReserve > 0L) {
            this.reserveCapacityInBytes.release(unusedClaimedReserve);
        }
        return outcome;
    }

    private void releaseCapacity(long count, long bytes) {
        long decrement = OutboundConnection.pendingCountAndBytes(count, bytes);
        long prev = pendingCountAndBytesUpdater.getAndAdd(this, -decrement);
        if (OutboundConnection.pendingBytes(prev) > this.pendingCapacityInBytes) {
            long excess = Math.min(OutboundConnection.pendingBytes(prev) - this.pendingCapacityInBytes, bytes);
            this.reserveCapacityInBytes.release(excess);
        }
    }

    private void onOverloaded(Message<?> message) {
        overloadedCountUpdater.incrementAndGet(this);
        int canonicalSize = this.canonicalSize(message);
        overloadedBytesUpdater.addAndGet(this, canonicalSize);
        noSpamLogger.warn("{} overloaded; dropping {} message (queue: {} local, {} endpoint, {} global)", this, FBUtilities.prettyPrintMemory(canonicalSize), this.readablePendingBytes, this.readableReserveEndpointUsing, this.readableReserveGlobalUsing);
        this.callbacks.onOverloaded(message, this.template.to);
    }

    private boolean onExpired(Message<?> message) {
        noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", new Object[]{this.id(), message.verb()});
        this.releaseCapacity(1L, this.canonicalSize(message));
        ++this.expiredCount;
        this.expiredBytes += (long)this.canonicalSize(message);
        this.callbacks.onExpired(message, this.template.to);
        return true;
    }

    private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t) {
        logger.warn("{} dropping message of type {} due to error", new Object[]{this.id(), message.verb(), t});
        JVMStabilityInspector.inspectThrowable(t);
        this.releaseCapacity(1L, this.canonicalSize(message));
        ++this.errorCount;
        this.errorBytes += (long)message.serializedSize(messagingVersion);
        this.callbacks.onFailedSerialize(message, this.template.to, messagingVersion, bytesWrittenToNetwork, t);
    }

    private void onClosed(Message<?> message) {
        this.releaseCapacity(1L, this.canonicalSize(message));
        this.callbacks.onDiscardOnClose(message, this.template.to);
    }

    private int canonicalSize(Message<?> message) {
        return message.serializedSize(12);
    }

    private void invalidateChannel(Established established, Throwable cause) {
        JVMStabilityInspector.inspectThrowable(cause);
        if (this.state != established) {
            return;
        }
        if (SocketFactory.isCausedByConnectionReset(cause)) {
            logger.info("{} channel closed by provider", (Object)this.id(), (Object)cause);
        } else {
            logger.error("{} channel in potentially inconsistent state after error; closing", (Object)this.id(), (Object)cause);
        }
        this.disconnectNow(established);
    }

    Future<?> initiate() {
        class Initiate {
            long retryRateMillis = DatabaseDescriptor.getMinRpcTimeout(TimeUnit.MILLISECONDS) / 2L;
            int messagingVersion;
            OutboundConnectionSettings settings;

            Initiate() {
                this.messagingVersion = OutboundConnection.this.template.endpointToVersion().get(((OutboundConnection)OutboundConnection.this).template.to);
            }

            void onFailure(Throwable cause) {
                if (cause instanceof ConnectException) {
                    noSpamLogger.info("{} failed to connect", OutboundConnection.this.id(), cause);
                } else {
                    noSpamLogger.error("{} failed to connect", OutboundConnection.this.id(), cause);
                }
                JVMStabilityInspector.inspectThrowable(cause);
                if (OutboundConnection.this.hasPending()) {
                    AsyncPromise<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> result = AsyncPromise.withExecutor((Executor)OutboundConnection.this.eventLoop);
                    OutboundConnection.this.state = new Connecting(OutboundConnection.this.state.disconnected(), (Future<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>>)result, (Future<?>)OutboundConnection.this.eventLoop.schedule(() -> this.attempt((Promise<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>>)result), Math.max(100L, this.retryRateMillis), TimeUnit.MILLISECONDS));
                    this.retryRateMillis = Math.min(1000L, this.retryRateMillis * 2L);
                } else {
                    OutboundConnection.this.state = Disconnected.dormant(((OutboundConnection)OutboundConnection.this).state.disconnected().maintenance);
                }
            }

            void onCompletedHandshake(OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess> result) {
                switch (result.outcome) {
                    case SUCCESS: {
                        assert (!OutboundConnection.this.state.isClosed());
                        OutboundConnectionInitiator.Result.MessagingSuccess success = result.success();
                        OutboundConnection.this.debug.onConnect(success.messagingVersion, this.settings);
                        ((OutboundConnection)OutboundConnection.this).state.disconnected().maintenance.cancel(false);
                        FrameEncoder.PayloadAllocator payloadAllocator = success.allocator;
                        Channel channel = success.channel;
                        final Established established = new Established(this.messagingVersion, channel, payloadAllocator, this.settings);
                        OutboundConnection.this.state = established;
                        channel.pipeline().addLast("handleExceptionalStates", (ChannelHandler)new ChannelInboundHandlerAdapter(){

                            public void channelInactive(ChannelHandlerContext ctx) {
                                OutboundConnection.this.disconnectNow(established);
                                ctx.fireChannelInactive();
                            }

                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                try {
                                    OutboundConnection.this.invalidateChannel(established, cause);
                                }
                                catch (Throwable t) {
                                    logger.error("Unexpected exception in {}.exceptionCaught", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)t);
                                }
                            }
                        });
                        ++OutboundConnection.this.successfulConnections;
                        logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", new Object[]{OutboundConnection.this.id(true), success.messagingVersion, this.settings.framing, SocketFactory.encryptionConnectionSummary(channel)});
                        break;
                    }
                    case RETRY: {
                        if (logger.isTraceEnabled()) {
                            logger.trace("{} incorrect legacy peer version predicted; reconnecting", (Object)OutboundConnection.this.id());
                        }
                        this.messagingVersion = result.retry().withMessagingVersion;
                        this.settings.endpointToVersion.set(this.settings.to, this.messagingVersion);
                        this.initiate();
                        break;
                    }
                    case INCOMPATIBLE: {
                        IOException t = new IOException(String.format("Incompatible peer: %s, messaging version: %s", this.settings.to, result.incompatible().maxMessagingVersion));
                        t.fillInStackTrace();
                        this.onFailure(t);
                        break;
                    }
                    default: {
                        throw new AssertionError();
                    }
                }
            }

            private void attempt(Promise<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> result) {
                ++OutboundConnection.this.connectionAttempts;
                int knownMessagingVersion = OutboundConnection.this.messagingVersion();
                if (knownMessagingVersion != this.messagingVersion) {
                    logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.", (Object)this.messagingVersion, (Object)knownMessagingVersion);
                    this.messagingVersion = knownMessagingVersion;
                }
                this.settings = OutboundConnection.this.template;
                if (this.messagingVersion > this.settings.acceptVersions.max) {
                    this.messagingVersion = this.settings.acceptVersions.max;
                }
                this.settings = this.settings.withLegacyPortIfNecessary(this.messagingVersion);
                OutboundConnectionInitiator.initiateMessaging(OutboundConnection.this.eventLoop, OutboundConnection.this.type, this.settings, this.messagingVersion, result).addListener(future -> {
                    if (future.isCancelled()) {
                        return;
                    }
                    if (future.isSuccess()) {
                        this.onCompletedHandshake((OutboundConnectionInitiator.Result)future.getNow());
                    } else {
                        this.onFailure(future.cause());
                    }
                });
            }

            Future<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> initiate() {
                AsyncPromise<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> result = AsyncPromise.withExecutor((Executor)OutboundConnection.this.eventLoop);
                OutboundConnection.this.state = new Connecting(OutboundConnection.this.state.disconnected(), result);
                this.attempt(result);
                return result;
            }
        }
        return new Initiate().initiate();
    }

    private Future<?> requestConnect() {
        State state = this.state;
        if (state.isConnecting()) {
            return state.connecting().attempt;
        }
        AsyncPromise promise = AsyncPromise.uncancellable((Executor)this.eventLoop);
        this.runOnEventLoop(() -> {
            if (this.isClosed()) {
                promise.tryFailure(new ClosedChannelException());
            } else if (this.state.isEstablished() && this.state.established().isConnected()) {
                promise.trySuccess(null);
            } else {
                if (this.state.isEstablished()) {
                    this.setDisconnected();
                }
                if (!this.state.isConnecting()) {
                    assert (this.eventLoop.inEventLoop());
                    assert (!this.isConnected());
                    this.initiate().addListener((GenericFutureListener)new PromiseNotifier(new Promise[]{promise}));
                } else {
                    this.state.connecting().attempt.addListener((GenericFutureListener)new PromiseNotifier(new Promise[]{promise}));
                }
            }
        });
        return promise;
    }

    Future<Void> reconnectWith(OutboundConnectionSettings reconnectWith) {
        OutboundConnectionSettings newTemplate = reconnectWith.withDefaults(ConnectionCategory.MESSAGING);
        if (newTemplate.socketFactory != this.template.socketFactory) {
            throw new IllegalArgumentException();
        }
        if (newTemplate.callbacks != this.template.callbacks) {
            throw new IllegalArgumentException();
        }
        if (!Objects.equals(newTemplate.applicationSendQueueCapacityInBytes, this.template.applicationSendQueueCapacityInBytes)) {
            throw new IllegalArgumentException();
        }
        if (!Objects.equals(newTemplate.applicationSendQueueReserveEndpointCapacityInBytes, this.template.applicationSendQueueReserveEndpointCapacityInBytes)) {
            throw new IllegalArgumentException();
        }
        if (newTemplate.applicationSendQueueReserveGlobalCapacityInBytes != this.template.applicationSendQueueReserveGlobalCapacityInBytes) {
            throw new IllegalArgumentException();
        }
        logger.info("{} updating connection settings", (Object)this.id());
        AsyncPromise<Void> done = AsyncPromise.uncancellable((Executor)this.eventLoop);
        this.delivery.stopAndRunOnEventLoop(() -> {
            this.template = newTemplate;
            if (this.state.isEstablished()) {
                this.disconnectNow(this.state.established());
            } else if (this.state.isConnecting()) {
                this.state.connecting().cancel();
                this.initiate();
            }
            done.setSuccess(null);
        });
        return done;
    }

    public boolean interrupt() {
        State state = this.state;
        if (!state.isEstablished()) {
            return false;
        }
        this.disconnectGracefully(state.established());
        return true;
    }

    private void disconnectGracefully(Established closeIfIs) {
        this.delivery.stopAndRunOnEventLoop(() -> this.disconnectNow(closeIfIs));
    }

    private Future<?> disconnectNow(Established closeIfIs) {
        return this.runOnEventLoop(() -> {
            if (this.state == closeIfIs) {
                this.setDisconnected();
                if (this.hasPending()) {
                    this.delivery.execute();
                }
                closeIfIs.channel.close().addListener(future -> {
                    if (!future.isSuccess()) {
                        logger.info("Problem closing channel {}", (Object)closeIfIs, (Object)future.cause());
                    }
                });
            }
        });
    }

    private void setDisconnected() {
        assert (this.state == null || this.state.isEstablished());
        this.state = Disconnected.dormant(this.eventLoop.scheduleAtFixedRate(this.queue::maybePruneExpired, 100L, 100L, TimeUnit.MILLISECONDS));
    }

    Future<Void> scheduleClose(long time, TimeUnit unit, boolean flushQueue) {
        AsyncPromise<Void> scheduledClose = AsyncPromise.uncancellable((Executor)this.eventLoop);
        if (!scheduledCloseUpdater.compareAndSet(this, null, scheduledClose)) {
            return this.scheduledClose;
        }
        this.eventLoop.schedule(() -> this.close(flushQueue).addListener((GenericFutureListener)new PromiseNotifier(new Promise[]{scheduledClose})), time, unit);
        return scheduledClose;
    }

    public Future<Void> close(boolean flushQueue) {
        AsyncPromise<Void> closing = AsyncPromise.uncancellable((Executor)this.eventLoop);
        if (!closingUpdater.compareAndSet(this, null, closing)) {
            return this.closing;
        }
        final Runnable eventLoopCleanup = () -> {
            Runnable onceNotConnecting = () -> {
                State state = this.state;
                this.state = State.CLOSED;
                try {
                    this.delivery.terminate();
                    if (state.isDisconnected()) {
                        state.disconnected().maintenance.cancel(true);
                        closing.setSuccess(null);
                    } else {
                        assert (state.isEstablished());
                        state.established().channel.close().addListener((GenericFutureListener)new PromiseNotifier(new Promise[]{closing}));
                    }
                }
                catch (Throwable t) {
                    closing.trySuccess(null);
                    try {
                        if (state.isEstablished()) {
                            state.established().channel.close();
                        }
                    }
                    catch (Throwable t2) {
                        t.addSuppressed(t2);
                        logger.error("Failed to close connection cleanly:", t);
                    }
                    throw t;
                }
            };
            if (this.state.isConnecting()) {
                Connecting connecting = this.state.connecting();
                connecting.cancel();
                connecting.attempt.addListener(future -> onceNotConnecting.run());
            } else {
                onceNotConnecting.run();
            }
        };
        final Runnable clearQueue = () -> {
            CountDownLatch done = CountDownLatch.newCountDownLatch(1);
            this.queue.runEventually(withLock -> {
                withLock.consume(this::onClosed);
                done.decrement();
            });
            done.awaitUninterruptibly();
        };
        if (flushQueue) {
            class FinishDelivery
            implements Runnable {
                FinishDelivery() {
                }

                @Override
                public void run() {
                    if (!OutboundConnection.this.hasPending()) {
                        OutboundConnection.this.delivery.stopAndRunOnEventLoop(eventLoopCleanup);
                    } else {
                        OutboundConnection.this.delivery.stopAndRun(() -> {
                            if (OutboundConnection.this.state.isConnecting() && ((OutboundConnection)OutboundConnection.this).state.connecting().isFailingToConnect) {
                                clearQueue.run();
                            }
                            this.run();
                        });
                    }
                }
            }
            this.delivery.stopAndRun(new FinishDelivery());
        } else {
            this.delivery.stopAndRunOnEventLoop(() -> {
                clearQueue.run();
                eventLoopCleanup.run();
            });
        }
        return closing;
    }

    private Future<?> runOnEventLoop(Runnable runnable) {
        if (!this.eventLoop.inEventLoop()) {
            return this.eventLoop.submit(runnable);
        }
        runnable.run();
        return new SucceededFuture((EventExecutor)this.eventLoop, null);
    }

    public boolean isConnected() {
        State state = this.state;
        return state.isEstablished() && state.established().isConnected();
    }

    boolean isClosing() {
        return this.closing != null;
    }

    boolean isClosed() {
        return this.state.isClosed();
    }

    private String id(boolean includeReal) {
        State state = this.state;
        if (!includeReal || !state.isEstablished()) {
            return this.id();
        }
        Established established = state.established();
        Channel channel = established.channel;
        OutboundConnectionSettings settings = established.settings;
        return SocketFactory.channelId(settings.from, (InetSocketAddress)channel.localAddress(), settings.to, (InetSocketAddress)channel.remoteAddress(), this.type, channel.id().asShortText());
    }

    private String id() {
        State state = this.state;
        Channel channel = null;
        OutboundConnectionSettings settings = this.template;
        if (state.isEstablished()) {
            channel = state.established().channel;
            settings = state.established().settings;
        }
        String channelId = channel != null ? channel.id().asShortText() : "[no-channel]";
        return SocketFactory.channelId(settings.from(), settings.to, this.type, channelId);
    }

    public String toString() {
        return this.id();
    }

    public boolean hasPending() {
        return 0L != this.pendingCountAndBytes;
    }

    public int pendingCount() {
        return OutboundConnection.pendingCount(this.pendingCountAndBytes);
    }

    public long pendingBytes() {
        return OutboundConnection.pendingBytes(this.pendingCountAndBytes);
    }

    public long sentCount() {
        return this.sentCount;
    }

    public long sentBytes() {
        return this.sentBytes;
    }

    public long submittedCount() {
        return this.submittedCount;
    }

    public long dropped() {
        return this.overloadedCount + this.expiredCount;
    }

    public long overloadedBytes() {
        return this.overloadedBytes;
    }

    public long overloadedCount() {
        return this.overloadedCount;
    }

    public long expiredCount() {
        return this.expiredCount;
    }

    public long expiredBytes() {
        return this.expiredBytes;
    }

    public long errorCount() {
        return this.errorCount;
    }

    public long errorBytes() {
        return this.errorBytes;
    }

    public long successfulConnections() {
        return this.successfulConnections;
    }

    public long connectionAttempts() {
        return this.connectionAttempts;
    }

    private static Runnable andThen(Runnable a, Runnable b) {
        if (a == null || b == null) {
            return a == null ? b : a;
        }
        return () -> {
            a.run();
            b.run();
        };
    }

    @VisibleForTesting
    public ConnectionType type() {
        return this.type;
    }

    @VisibleForTesting
    OutboundConnectionSettings settings() {
        State state = this.state;
        return state.isEstablished() ? state.established().settings : this.template;
    }

    @VisibleForTesting
    int messagingVersion() {
        State state = this.state;
        return state.isEstablished() ? state.established().messagingVersion : this.template.endpointToVersion().get(this.template.to);
    }

    @VisibleForTesting
    void unsafeRunOnDelivery(Runnable run) {
        this.delivery.stopAndRun(run);
    }

    @VisibleForTesting
    Channel unsafeGetChannel() {
        State state = this.state;
        return state.isEstablished() ? state.established().channel : null;
    }

    @VisibleForTesting
    boolean unsafeAcquireCapacity(long amount) {
        return ResourceLimits.Outcome.SUCCESS == this.acquireCapacity(amount);
    }

    @VisibleForTesting
    boolean unsafeAcquireCapacity(long count, long amount) {
        return ResourceLimits.Outcome.SUCCESS == this.acquireCapacity(count, amount);
    }

    @VisibleForTesting
    void unsafeReleaseCapacity(long amount) {
        this.releaseCapacity(1L, amount);
    }

    @VisibleForTesting
    void unsafeReleaseCapacity(long count, long amount) {
        this.releaseCapacity(count, amount);
    }

    @VisibleForTesting
    ResourceLimits.Limit unsafeGetEndpointReserveLimits() {
        return this.reserveCapacityInBytes.endpoint;
    }

    class LargeMessageDelivery
    extends Delivery {
        static final int DEFAULT_BUFFER_SIZE = 32768;

        LargeMessageDelivery(ExecutorService executor) {
            super(executor);
        }

        @Override
        public void run() {
            String priorThreadName = null;
            try {
                priorThreadName = Thread.currentThread().getName();
                String threadName = "Messaging-OUT-" + OutboundConnection.this.template.from() + "->" + ((OutboundConnection)OutboundConnection.this).template.to + '-' + (Object)((Object)OutboundConnection.this.type);
                Thread.currentThread().setName(threadName);
                super.run();
            }
            finally {
                if (priorThreadName != null) {
                    Thread.currentThread().setName(priorThreadName);
                }
            }
        }

        @Override
        boolean doRun(Established established) {
            Message<?> send = OutboundConnection.this.queue.tryPoll(MonotonicClock.Global.approxTime.now(), this::execute);
            if (send == null) {
                return false;
            }
            AsyncMessageOutputPlus out = null;
            try {
                int messageSize = send.serializedSize(established.messagingVersion);
                out = new AsyncMessageOutputPlus(established.channel, 32768, messageSize, established.payloadAllocator);
                if (messageSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes()) {
                    throw new Message.OversizedMessageException(messageSize);
                }
                Tracing.instance.traceOutgoingMessage(send, messageSize, established.settings.connectTo);
                Message.serializer.serialize(send, out, established.messagingVersion);
                if (out.position() != (long)messageSize) {
                    throw new InvalidSerializedSizeException(send.verb(), messageSize, out.position());
                }
                out.close();
                OutboundConnection.this.sentCount = OutboundConnection.this.sentCount + 1L;
                OutboundConnection.this.sentBytes = OutboundConnection.this.sentBytes + (long)messageSize;
                OutboundConnection.this.releaseCapacity(1L, OutboundConnection.this.canonicalSize(send));
                return OutboundConnection.this.hasPending();
            }
            catch (Throwable t) {
                boolean tryAgain = true;
                if (out != null) {
                    out.discard();
                    if (out.flushed() > 0L || Throwables.isCausedBy(t, cause -> SocketFactory.isConnectionReset(cause) || cause instanceof Errors.NativeIoException || cause instanceof AsyncChannelOutputPlus.FlushException)) {
                        OutboundConnection.this.disconnectNow(established).awaitUninterruptibly();
                        tryAgain = false;
                        try {
                            out.waitUntilFlushed(0L, 0L);
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                    }
                }
                OutboundConnection.this.onFailedSerialize(send, established.messagingVersion, out == null ? 0 : (int)out.flushedToNetwork(), t);
                return tryAgain;
            }
        }

        @Override
        void stopAndRunOnEventLoop(Runnable run) {
            this.stopAndRun(() -> {
                try {
                    OutboundConnection.this.runOnEventLoop(run).await();
                }
                catch (InterruptedException e) {
                    throw new UncheckedInterruptedException(e);
                }
            });
        }
    }

    class EventLoopDelivery
    extends Delivery {
        private int flushingBytes;
        private boolean isWritable;

        EventLoopDelivery() {
            super((ExecutorService)OutboundConnection.this.eventLoop);
            this.isWritable = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        boolean doRun(Established established) {
            if (!this.isWritable) {
                return false;
            }
            int maxSendBytes = (int)Math.min(OutboundConnection.this.pendingBytes() - (long)this.flushingBytes, (long)OutboundConnections.LARGE_MESSAGE_THRESHOLD);
            if (maxSendBytes == 0) {
                return false;
            }
            OutboundConnectionSettings settings = established.settings;
            int messagingVersion = established.messagingVersion;
            FrameEncoder.Payload sending = null;
            int canonicalSize = 0;
            int sendingBytes = 0;
            int sendingCount = 0;
            try (OutboundMessageQueue.WithLock withLock = OutboundConnection.this.queue.lockOrCallback(MonotonicClock.Global.approxTime.now(), this::execute);){
                boolean hasOverflowed;
                Message<?> next;
                if (withLock == null) {
                    boolean bl = false;
                    return bl;
                }
                sending = established.payloadAllocator.allocate(true, maxSendBytes);
                DataOutputBufferFixed out = new DataOutputBufferFixed(sending.buffer);
                while (null != (next = withLock.peek())) {
                    try {
                        int messageSize = next.serializedSize(messagingVersion);
                        if (messageSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes()) {
                            throw new Message.OversizedMessageException(messageSize);
                        }
                        if (messageSize > sending.remaining()) {
                            if (sendingBytes > 0) break;
                            sending.release();
                            sending = null;
                            sending = established.payloadAllocator.allocate(true, messageSize);
                            out = new DataOutputBufferFixed(sending.buffer);
                        }
                        Tracing.instance.traceOutgoingMessage(next, messageSize, settings.connectTo);
                        Message.serializer.serialize(next, out, messagingVersion);
                        if (sending.length() != sendingBytes + messageSize) {
                            throw new InvalidSerializedSizeException(next.verb(), messageSize, sending.length() - sendingBytes);
                        }
                        canonicalSize += OutboundConnection.this.canonicalSize(next);
                        ++sendingCount;
                        sendingBytes += messageSize;
                    }
                    catch (Throwable t) {
                        OutboundConnection.this.onFailedSerialize(next, messagingVersion, 0, t);
                        assert (sending != null);
                        sending.trim(sendingBytes);
                    }
                    withLock.removeHead(next);
                }
                if (0 == sendingBytes) {
                    boolean t = false;
                    return t;
                }
                sending.finish();
                OutboundConnection.this.debug.onSendSmallFrame(sendingCount, sendingBytes);
                ChannelFuture flushResult = AsyncChannelPromise.writeAndFlush(established.channel, (Object)sending);
                sending = null;
                if (flushResult.isSuccess()) {
                    OutboundConnection.this.sentCount = OutboundConnection.this.sentCount + (long)sendingCount;
                    OutboundConnection.this.sentBytes = OutboundConnection.this.sentBytes + (long)sendingBytes;
                    OutboundConnection.this.debug.onSentSmallFrame(sendingCount, sendingBytes);
                    return false;
                }
                this.flushingBytes += canonicalSize;
                this.setInProgress(true);
                boolean bl = hasOverflowed = this.flushingBytes >= settings.flushHighWaterMark;
                if (hasOverflowed) {
                    this.isWritable = false;
                    this.promiseToExecuteLater();
                }
                int releaseBytesFinal = canonicalSize;
                int sendingBytesFinal = sendingBytes;
                int sendingCountFinal = sendingCount;
                flushResult.addListener(future -> {
                    OutboundConnection.this.releaseCapacity(sendingCountFinal, releaseBytesFinal);
                    this.flushingBytes -= releaseBytesFinal;
                    if (this.flushingBytes == 0) {
                        this.setInProgress(false);
                    }
                    if (!this.isWritable && this.flushingBytes <= settings.flushLowWaterMark) {
                        this.isWritable = true;
                        this.executeAgain();
                    }
                    if (future.isSuccess()) {
                        OutboundConnection.this.sentCount = OutboundConnection.this.sentCount + (long)sendingCountFinal;
                        OutboundConnection.this.sentBytes = OutboundConnection.this.sentBytes + (long)sendingBytesFinal;
                        OutboundConnection.this.debug.onSentSmallFrame(sendingCountFinal, sendingBytesFinal);
                    } else {
                        OutboundConnection.this.errorCount = OutboundConnection.this.errorCount + (long)sendingCountFinal;
                        OutboundConnection.this.errorBytes = OutboundConnection.this.errorBytes + (long)sendingBytesFinal;
                        OutboundConnection.this.invalidateChannel(established, future.cause());
                        OutboundConnection.this.debug.onFailedSmallFrame(sendingCountFinal, sendingBytesFinal);
                    }
                });
                canonicalSize = 0;
                return false;
            }
            catch (Throwable t) {
                OutboundConnection.this.errorCount = OutboundConnection.this.errorCount + (long)sendingCount;
                OutboundConnection.this.errorBytes = OutboundConnection.this.errorBytes + (long)sendingBytes;
                OutboundConnection.this.invalidateChannel(established, t);
                return false;
            }
            finally {
                if (canonicalSize > 0) {
                    OutboundConnection.this.releaseCapacity(sendingCount, canonicalSize);
                }
                if (sending != null) {
                    sending.release();
                }
                if (OutboundConnection.this.pendingBytes() > (long)this.flushingBytes && this.isWritable) {
                    this.execute();
                }
            }
        }

        @Override
        void stopAndRunOnEventLoop(Runnable run) {
            this.stopAndRun(run);
        }
    }

    private abstract class Delivery
    extends AtomicInteger
    implements Runnable {
        final ExecutorService executor;
        private static final int STOPPED = 0;
        private static final int EXECUTING = 1;
        private static final int EXECUTE_AGAIN = 2;
        private static final int EXECUTING_AGAIN = 3;
        private static final int WAITING_TO_EXECUTE = 4;
        private volatile boolean terminated;
        private boolean inProgress = false;
        final AtomicReference<Runnable> stopAndRun = new AtomicReference();

        Delivery(ExecutorService executor) {
            this.executor = executor;
        }

        public void execute() {
            if (this.get() < 2 && 0 == this.getAndUpdate(i -> i == 0 ? 1 : i | 2)) {
                this.executor.execute(this);
            }
        }

        private boolean isExecuting(int state) {
            return 0 != (state & 1);
        }

        void executeAgain() {
            if (!this.isExecuting(this.getAndUpdate(i -> !this.isExecuting(i) ? 1 : 3))) {
                this.executor.execute(this);
            }
        }

        void promiseToExecuteLater() {
            this.set(5);
        }

        private void maybeExecuteAgain() {
            if (3 == this.getAndUpdate(i -> i == 3 ? 1 : i & 0xFFFFFFFE)) {
                this.executor.execute(this);
            }
        }

        public void terminate() {
            this.terminated = true;
        }

        void setInProgress(boolean inProgress) {
            boolean wasInProgress = this.inProgress;
            this.inProgress = inProgress;
            if (!inProgress && wasInProgress) {
                this.executeAgain();
            }
        }

        /*
         * Enabled aggressive block sorting
         */
        @Override
        public void run() {
            State state;
            do {
                if (this.terminated) {
                    return;
                }
                if (null != this.stopAndRun.get()) {
                    if (this.inProgress) {
                        this.promiseToExecuteLater();
                        break;
                    }
                    ((Runnable)this.stopAndRun.getAndSet(null)).run();
                }
                if ((state = OutboundConnection.this.state).isEstablished() && state.established().isConnected()) continue;
                if (!OutboundConnection.this.hasPending() && null == this.stopAndRun.get()) break;
                this.promiseToExecuteLater();
                OutboundConnection.this.requestConnect().addListener(f -> this.executeAgain());
                break;
            } while (this.doRun(state.established()));
            this.maybeExecuteAgain();
        }

        abstract boolean doRun(Established var1);

        void stopAndRun(Runnable run) {
            this.stopAndRun.accumulateAndGet(run, (x$0, x$1) -> OutboundConnection.andThen(x$0, x$1));
            this.execute();
        }

        abstract void stopAndRunOnEventLoop(Runnable var1);
    }

    private static class Connecting
    extends Disconnected {
        final Future<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> attempt;
        @Nullable
        final Future<?> scheduled;
        final boolean isFailingToConnect;

        Connecting(Disconnected previous, Future<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> attempt) {
            this(previous, attempt, null);
        }

        Connecting(Disconnected previous, Future<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.MessagingSuccess>> attempt, Future<?> scheduled) {
            super(State.Kind.CONNECTING, previous.maintenance);
            this.attempt = attempt;
            this.scheduled = scheduled;
            this.isFailingToConnect = scheduled != null || previous.isConnecting() && previous.connecting().isFailingToConnect;
        }

        void cancel() {
            if (this.scheduled != null) {
                this.scheduled.cancel(true);
            }
            boolean cancelled = this.attempt.cancel(true);
            assert (cancelled);
        }
    }

    private static class Disconnected
    extends State {
        final Future<?> maintenance;

        Disconnected(State.Kind kind, Future<?> maintenance) {
            super(kind);
            this.maintenance = maintenance;
        }

        public static Disconnected dormant(Future<?> maintenance) {
            return new Disconnected(State.Kind.DORMANT, maintenance);
        }
    }

    private static class Established
    extends State {
        final int messagingVersion;
        final Channel channel;
        final FrameEncoder.PayloadAllocator payloadAllocator;
        final OutboundConnectionSettings settings;

        Established(int messagingVersion, Channel channel, FrameEncoder.PayloadAllocator payloadAllocator, OutboundConnectionSettings settings) {
            super(State.Kind.ESTABLISHED);
            this.messagingVersion = messagingVersion;
            this.channel = channel;
            this.payloadAllocator = payloadAllocator;
            this.settings = settings;
        }

        boolean isConnected() {
            return this.channel.isOpen();
        }
    }

    private static class State {
        static final State CLOSED = new State(Kind.CLOSED);
        final Kind kind;

        State(Kind kind) {
            this.kind = kind;
        }

        boolean isEstablished() {
            return this.kind == Kind.ESTABLISHED;
        }

        boolean isConnecting() {
            return this.kind == Kind.CONNECTING;
        }

        boolean isDisconnected() {
            return this.kind == Kind.CONNECTING || this.kind == Kind.DORMANT;
        }

        boolean isClosed() {
            return this.kind == Kind.CLOSED;
        }

        Established established() {
            return (Established)this;
        }

        Connecting connecting() {
            return (Connecting)this;
        }

        Disconnected disconnected() {
            return (Disconnected)this;
        }

        static enum Kind {
            ESTABLISHED,
            CONNECTING,
            DORMANT,
            CLOSED;

        }
    }
}

