/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
import org.apache.cassandra.net.ForwardingInfo;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundMessageCallbacks;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestCallbacks
implements OutboundMessageCallbacks {
    private static final Logger logger = LoggerFactory.getLogger(RequestCallbacks.class);
    private final MessagingService messagingService;
    private final ScheduledExecutorPlus executor = ExecutorFactory.Global.executorFactory().scheduled("Callback-Map-Reaper", ExecutorFactory.SimulatorSemantics.DISCARD);
    private final ConcurrentMap<CallbackKey, CallbackInfo> callbacks = new ConcurrentHashMap<CallbackKey, CallbackInfo>();

    RequestCallbacks(MessagingService messagingService) {
        this.messagingService = messagingService;
        long expirationInterval = RequestCallbacks.defaultExpirationInterval();
        this.executor.scheduleWithFixedDelay(this::expire, expirationInterval, expirationInterval, TimeUnit.NANOSECONDS);
    }

    @Nullable
    CallbackInfo get(long id, InetAddressAndPort peer) {
        return (CallbackInfo)this.callbacks.get(RequestCallbacks.key(id, peer));
    }

    @Nullable
    @VisibleForTesting
    public CallbackInfo remove(long id, InetAddressAndPort peer) {
        return (CallbackInfo)this.callbacks.remove(RequestCallbacks.key(id, peer));
    }

    public void addWithExpiration(RequestCallback<?> cb, Message<?> message, InetAddressAndPort to) {
        assert (message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ);
        CallbackInfo previous = this.callbacks.put(RequestCallbacks.key(message.id(), to), new CallbackInfo(message, to, cb));
        assert (previous == null) : String.format("Callback already exists for id %d/%s! (%s)", message.id(), to, previous);
    }

    public void addWithExpiration(AbstractWriteResponseHandler<?> cb, Message<?> message, Replica to) {
        assert (message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ);
        CallbackInfo previous = this.callbacks.put(RequestCallbacks.key(message.id(), to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
        assert (previous == null) : String.format("Callback already exists for id %d/%s! (%s)", message.id(), to.endpoint(), previous);
    }

    <In, Out> IVersionedAsymmetricSerializer<In, Out> responseSerializer(long id, InetAddressAndPort peer) {
        CallbackInfo info = this.get(id, peer);
        return info == null ? null : info.responseVerb.serializer();
    }

    @VisibleForTesting
    public void removeAndRespond(long id, InetAddressAndPort peer, Message message) {
        CallbackInfo ci = this.remove(id, peer);
        if (null != ci) {
            ci.callback.onResponse(message);
        }
    }

    private void removeAndExpire(long id, InetAddressAndPort peer) {
        CallbackInfo ci = this.remove(id, peer);
        if (null != ci) {
            this.onExpired(ci);
        }
    }

    private void expire() {
        long start = MonotonicClock.Global.preciseTime.now();
        int n = 0;
        for (Map.Entry entry : this.callbacks.entrySet()) {
            if (!((CallbackInfo)entry.getValue()).isReadyToDieAt(start) || !this.callbacks.remove(entry.getKey(), entry.getValue())) continue;
            ++n;
            this.onExpired((CallbackInfo)entry.getValue());
        }
        logger.trace("Expired {} entries", (Object)n);
    }

    private void forceExpire() {
        for (Map.Entry entry : this.callbacks.entrySet()) {
            if (!this.callbacks.remove(entry.getKey(), entry.getValue())) continue;
            this.onExpired((CallbackInfo)entry.getValue());
        }
    }

    private void onExpired(CallbackInfo info) {
        this.messagingService.latencySubscribers.maybeAdd(info.callback, info.peer, info.timeout(), TimeUnit.NANOSECONDS);
        InternodeOutboundMetrics.totalExpiredCallbacks.mark();
        this.messagingService.markExpiredCallback(info.peer);
        if (info.invokeOnFailure()) {
            Stage.INTERNAL_RESPONSE.submit(() -> info.callback.onFailure(info.peer, RequestFailureReason.TIMEOUT));
        }
    }

    void shutdownNow(boolean expireCallbacks) {
        this.executor.shutdownNow();
        if (expireCallbacks) {
            this.forceExpire();
        }
    }

    void shutdownGracefully() {
        this.expire();
        if (!this.callbacks.isEmpty()) {
            this.executor.schedule(this::shutdownGracefully, 100L, TimeUnit.MILLISECONDS);
        } else {
            this.executor.shutdownNow();
        }
    }

    void awaitTerminationUntil(long deadlineNanos) throws TimeoutException, InterruptedException {
        long wait;
        if (!(this.executor.isTerminated() || (wait = deadlineNanos - Clock.Global.nanoTime()) > 0L && this.executor.awaitTermination(wait, TimeUnit.NANOSECONDS))) {
            throw new TimeoutException();
        }
    }

    @VisibleForTesting
    public void unsafeClear() {
        this.callbacks.clear();
    }

    private static CallbackKey key(long id, InetAddressAndPort peer) {
        return new CallbackKey(id, peer);
    }

    @Override
    public void onOverloaded(Message<?> message, InetAddressAndPort peer) {
        this.removeAndExpire(message, peer);
    }

    @Override
    public void onExpired(Message<?> message, InetAddressAndPort peer) {
        this.removeAndExpire(message, peer);
    }

    @Override
    public void onFailedSerialize(Message<?> message, InetAddressAndPort peer, int messagingVersion, int bytesWrittenToNetwork, Throwable failure) {
        this.removeAndExpire(message, peer);
    }

    @Override
    public void onDiscardOnClose(Message<?> message, InetAddressAndPort peer) {
        this.removeAndExpire(message, peer);
    }

    private void removeAndExpire(Message message, InetAddressAndPort peer) {
        this.removeAndExpire(message.id(), peer);
        ForwardingInfo forwardTo = message.forwardTo();
        if (null != forwardTo) {
            forwardTo.forEach(this::removeAndExpire);
        }
    }

    public static long defaultExpirationInterval() {
        return DatabaseDescriptor.getMinRpcTimeout(TimeUnit.NANOSECONDS) / 2L;
    }

    @VisibleForTesting
    public static class CallbackInfo {
        final long createdAtNanos;
        final long expiresAtNanos;
        final InetAddressAndPort peer;
        public final RequestCallback callback;
        @Deprecated
        public final Verb responseVerb;

        private CallbackInfo(Message message, InetAddressAndPort peer, RequestCallback callback) {
            this.createdAtNanos = message.createdAtNanos();
            this.expiresAtNanos = message.expiresAtNanos();
            this.peer = peer;
            this.callback = callback;
            this.responseVerb = message.verb().responseVerb;
        }

        public long timeout() {
            return this.expiresAtNanos - this.createdAtNanos;
        }

        boolean isReadyToDieAt(long atNano) {
            return atNano > this.expiresAtNanos;
        }

        boolean invokeOnFailure() {
            return this.callback.invokeOnFailure();
        }

        public String toString() {
            return "{peer:" + this.peer + ", callback:" + this.callback + ", invokeOnFailure:" + this.invokeOnFailure() + '}';
        }
    }

    private static class CallbackKey {
        final long id;
        final InetAddressAndPort peer;

        CallbackKey(long id, InetAddressAndPort peer) {
            this.id = id;
            this.peer = peer;
        }

        public boolean equals(Object o) {
            if (!(o instanceof CallbackKey)) {
                return false;
            }
            CallbackKey that = (CallbackKey)o;
            return this.id == that.id && this.peer.equals(that.peer);
        }

        public int hashCode() {
            return Long.hashCode(this.id) + 31 * this.peer.hashCode();
        }

        public String toString() {
            return "{id:" + this.id + ", peer:" + this.peer + '}';
        }
    }
}

