/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.io.netty.channel.Channel;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.InternodeInboundMetrics;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Crc;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.InboundCounters;
import org.apache.cassandra.net.InboundMessageCallbacks;
import org.apache.cassandra.net.InboundMessageHandler;
import org.apache.cassandra.net.InvalidSerializedSizeException;
import org.apache.cassandra.net.LatencyConsumer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.OutboundConnections;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.MonotonicClock;

public final class InboundMessageHandlers {
    private final InetAddressAndPort self;
    private final InetAddressAndPort peer;
    private final int queueCapacity;
    private final ResourceLimits.Limit endpointReserveCapacity;
    private final ResourceLimits.Limit globalReserveCapacity;
    private final AbstractMessageHandler.WaitQueue endpointWaitQueue;
    private final AbstractMessageHandler.WaitQueue globalWaitQueue;
    private final InboundCounters urgentCounters = new InboundCounters();
    private final InboundCounters smallCounters = new InboundCounters();
    private final InboundCounters largeCounters = new InboundCounters();
    private final InboundCounters legacyCounters = new InboundCounters();
    private final InboundMessageCallbacks urgentCallbacks;
    private final InboundMessageCallbacks smallCallbacks;
    private final InboundMessageCallbacks largeCallbacks;
    private final InboundMessageCallbacks legacyCallbacks;
    private final InternodeInboundMetrics metrics;
    private final MessageConsumer messageConsumer;
    private final HandlerProvider handlerProvider;
    private final Collection<InboundMessageHandler> handlers = new CopyOnWriteArrayList<InboundMessageHandler>();
    private volatile long closedReceivedCount;
    private volatile long closedReceivedBytes;
    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedReceivedCountUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedReceivedCount");
    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedReceivedBytesUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedReceivedBytes");
    private volatile long closedThrottledCount;
    private volatile long closedThrottledNanos;
    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedThrottledCountUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedThrottledCount");
    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedThrottledNanosUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedThrottledNanos");
    private volatile long closedCorruptFramesRecovered;
    private volatile long closedCorruptFramesUnrecovered;
    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedCorruptFramesRecoveredUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedCorruptFramesRecovered");
    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedCorruptFramesUnrecoveredUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedCorruptFramesUnrecovered");

    public InboundMessageHandlers(InetAddressAndPort self, InetAddressAndPort peer, int queueCapacity, long endpointReserveCapacity, GlobalResourceLimits globalResourceLimits, GlobalMetricCallbacks globalMetricCallbacks, MessageConsumer messageConsumer) {
        this(self, peer, queueCapacity, endpointReserveCapacity, globalResourceLimits, globalMetricCallbacks, messageConsumer, InboundMessageHandler::new);
    }

    public InboundMessageHandlers(InetAddressAndPort self, InetAddressAndPort peer, int queueCapacity, long endpointReserveCapacity, GlobalResourceLimits globalResourceLimits, GlobalMetricCallbacks globalMetricCallbacks, MessageConsumer messageConsumer, HandlerProvider handlerProvider) {
        this.self = self;
        this.peer = peer;
        this.queueCapacity = queueCapacity;
        this.endpointReserveCapacity = new ResourceLimits.Concurrent(endpointReserveCapacity);
        this.globalReserveCapacity = globalResourceLimits.reserveCapacity;
        this.endpointWaitQueue = AbstractMessageHandler.WaitQueue.endpoint(this.endpointReserveCapacity);
        this.globalWaitQueue = globalResourceLimits.waitQueue;
        this.messageConsumer = messageConsumer;
        this.handlerProvider = handlerProvider;
        this.urgentCallbacks = InboundMessageHandlers.makeMessageCallbacks(peer, this.urgentCounters, globalMetricCallbacks, messageConsumer);
        this.smallCallbacks = InboundMessageHandlers.makeMessageCallbacks(peer, this.smallCounters, globalMetricCallbacks, messageConsumer);
        this.largeCallbacks = InboundMessageHandlers.makeMessageCallbacks(peer, this.largeCounters, globalMetricCallbacks, messageConsumer);
        this.legacyCallbacks = InboundMessageHandlers.makeMessageCallbacks(peer, this.legacyCounters, globalMetricCallbacks, messageConsumer);
        this.metrics = new InternodeInboundMetrics(peer, this);
    }

    InboundMessageHandler createHandler(FrameDecoder frameDecoder, ConnectionType type, Channel channel, int version) {
        InboundMessageHandler handler = this.handlerProvider.provide(frameDecoder, type, channel, this.self, this.peer, version, OutboundConnections.LARGE_MESSAGE_THRESHOLD, this.queueCapacity, this.endpointReserveCapacity, this.globalReserveCapacity, this.endpointWaitQueue, this.globalWaitQueue, this::onHandlerClosed, this.callbacksFor(type), this.messageConsumer);
        this.handlers.add(handler);
        return handler;
    }

    void releaseMetrics() {
        this.metrics.release();
    }

    private void onHandlerClosed(AbstractMessageHandler handler) {
        assert (handler instanceof InboundMessageHandler);
        this.handlers.remove(handler);
        this.absorbCounters((InboundMessageHandler)handler);
    }

    @VisibleForTesting
    public int count() {
        return this.handlers.size();
    }

    private InboundMessageCallbacks callbacksFor(ConnectionType type) {
        switch (type) {
            case URGENT_MESSAGES: {
                return this.urgentCallbacks;
            }
            case SMALL_MESSAGES: {
                return this.smallCallbacks;
            }
            case LARGE_MESSAGES: {
                return this.largeCallbacks;
            }
            case LEGACY_MESSAGES: {
                return this.legacyCallbacks;
            }
        }
        throw new IllegalArgumentException();
    }

    private static InboundMessageCallbacks makeMessageCallbacks(InetAddressAndPort peer, final InboundCounters counters, final GlobalMetricCallbacks globalMetrics, final MessageConsumer messageConsumer) {
        final LatencyConsumer internodeLatency = globalMetrics.internodeLatencyRecorder(peer);
        return new InboundMessageCallbacks(){

            @Override
            public void onHeaderArrived(int messageSize, Message.Header header, long timeElapsed, TimeUnit unit) {
                if (timeElapsed > unit.convert(MonotonicClock.Global.approxTime.error(), TimeUnit.NANOSECONDS)) {
                    internodeLatency.accept(timeElapsed, unit);
                }
            }

            @Override
            public void onArrived(int messageSize, Message.Header header, long timeElapsed, TimeUnit unit) {
            }

            @Override
            public void onArrivedExpired(int messageSize, Message.Header header, boolean wasCorrupt, long timeElapsed, TimeUnit unit) {
                counters.addExpired(messageSize);
                globalMetrics.recordInternodeDroppedMessage(header.verb, timeElapsed, unit);
            }

            @Override
            public void onArrivedCorrupt(int messageSize, Message.Header header, long timeElapsed, TimeUnit unit) {
                counters.addError(messageSize);
                messageConsumer.fail(header, new Crc.InvalidCrc(0, 0));
            }

            @Override
            public void onClosedBeforeArrival(int messageSize, Message.Header header, int bytesReceived, boolean wasCorrupt, boolean wasExpired) {
                counters.addError(messageSize);
                messageConsumer.fail(header, new InvalidSerializedSizeException(header.verb, messageSize, bytesReceived));
            }

            @Override
            public void onExpired(int messageSize, Message.Header header, long timeElapsed, TimeUnit unit) {
                counters.addExpired(messageSize);
                globalMetrics.recordInternodeDroppedMessage(header.verb, timeElapsed, unit);
            }

            @Override
            public void onFailedDeserialize(int messageSize, Message.Header header, Throwable t) {
                counters.addError(messageSize);
                messageConsumer.fail(header, t);
            }

            @Override
            public void onDispatched(int messageSize, Message.Header header) {
                counters.addPending(messageSize);
            }

            @Override
            public void onExecuting(int messageSize, Message.Header header, long timeElapsed, TimeUnit unit) {
                globalMetrics.recordInternalLatency(header.verb, timeElapsed, unit);
            }

            @Override
            public void onExecuted(int messageSize, Message.Header header, long timeElapsed, TimeUnit unit) {
                counters.removePending(messageSize);
            }

            @Override
            public void onProcessed(int messageSize, Message.Header header) {
                counters.addProcessed(messageSize);
            }
        };
    }

    InboundCounters countersFor(ConnectionType type) {
        switch (type) {
            case URGENT_MESSAGES: {
                return this.urgentCounters;
            }
            case SMALL_MESSAGES: {
                return this.smallCounters;
            }
            case LARGE_MESSAGES: {
                return this.largeCounters;
            }
            case LEGACY_MESSAGES: {
                return this.legacyCounters;
            }
        }
        throw new IllegalArgumentException();
    }

    public long receivedCount() {
        return this.sumHandlers(h -> h.receivedCount) + this.closedReceivedCount;
    }

    public long receivedBytes() {
        return this.sumHandlers(h -> h.receivedBytes) + this.closedReceivedBytes;
    }

    public long throttledCount() {
        return this.sumHandlers(h -> h.throttledCount) + this.closedThrottledCount;
    }

    public long throttledNanos() {
        return this.sumHandlers(h -> h.throttledNanos) + this.closedThrottledNanos;
    }

    public long usingCapacity() {
        return this.sumHandlers(h -> h.queueSize);
    }

    public long usingEndpointReserveCapacity() {
        return this.endpointReserveCapacity.using();
    }

    public long corruptFramesRecovered() {
        return this.sumHandlers(h -> h.corruptFramesRecovered) + this.closedCorruptFramesRecovered;
    }

    public long corruptFramesUnrecovered() {
        return this.sumHandlers(h -> h.corruptFramesUnrecovered) + this.closedCorruptFramesUnrecovered;
    }

    public long errorCount() {
        return this.sumCounters(InboundCounters::errorCount);
    }

    public long errorBytes() {
        return this.sumCounters(InboundCounters::errorBytes);
    }

    public long expiredCount() {
        return this.sumCounters(InboundCounters::expiredCount);
    }

    public long expiredBytes() {
        return this.sumCounters(InboundCounters::expiredBytes);
    }

    public long processedCount() {
        return this.sumCounters(InboundCounters::processedCount);
    }

    public long processedBytes() {
        return this.sumCounters(InboundCounters::processedBytes);
    }

    public long scheduledCount() {
        return this.sumCounters(InboundCounters::scheduledCount);
    }

    public long scheduledBytes() {
        return this.sumCounters(InboundCounters::scheduledBytes);
    }

    private void absorbCounters(InboundMessageHandler handler) {
        closedReceivedCountUpdater.addAndGet(this, handler.receivedCount);
        closedReceivedBytesUpdater.addAndGet(this, handler.receivedBytes);
        closedThrottledCountUpdater.addAndGet(this, handler.throttledCount);
        closedThrottledNanosUpdater.addAndGet(this, handler.throttledNanos);
        closedCorruptFramesRecoveredUpdater.addAndGet(this, handler.corruptFramesRecovered);
        closedCorruptFramesUnrecoveredUpdater.addAndGet(this, handler.corruptFramesUnrecovered);
    }

    private long sumHandlers(ToLongFunction<InboundMessageHandler> counter) {
        long sum = 0L;
        for (InboundMessageHandler h : this.handlers) {
            sum += counter.applyAsLong(h);
        }
        return sum;
    }

    private long sumCounters(ToLongFunction<InboundCounters> mapping) {
        return mapping.applyAsLong(this.urgentCounters) + mapping.applyAsLong(this.smallCounters) + mapping.applyAsLong(this.largeCounters) + mapping.applyAsLong(this.legacyCounters);
    }

    static interface HandlerProvider {
        public InboundMessageHandler provide(FrameDecoder var1, ConnectionType var2, Channel var3, InetAddressAndPort var4, InetAddressAndPort var5, int var6, int var7, int var8, ResourceLimits.Limit var9, ResourceLimits.Limit var10, AbstractMessageHandler.WaitQueue var11, AbstractMessageHandler.WaitQueue var12, AbstractMessageHandler.OnHandlerClosed var13, InboundMessageCallbacks var14, Consumer<Message<?>> var15);
    }

    public static interface GlobalMetricCallbacks {
        public LatencyConsumer internodeLatencyRecorder(InetAddressAndPort var1);

        public void recordInternalLatency(Verb var1, long var2, TimeUnit var4);

        public void recordInternodeDroppedMessage(Verb var1, long var2, TimeUnit var4);
    }

    public static interface MessageConsumer
    extends Consumer<Message<?>> {
        public void fail(Message.Header var1, Throwable var2);
    }

    static class GlobalResourceLimits {
        final ResourceLimits.Limit reserveCapacity;
        final AbstractMessageHandler.WaitQueue waitQueue;

        GlobalResourceLimits(ResourceLimits.Limit reserveCapacity) {
            this.reserveCapacity = reserveCapacity;
            this.waitQueue = AbstractMessageHandler.WaitQueue.global(reserveCapacity);
        }
    }
}

