/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.com.google.common.base.Predicate;
import net.nmoncho.shaded.io.netty.channel.Channel;
import net.nmoncho.shaded.io.netty.channel.EventLoop;
import net.nmoncho.shaded.io.netty.util.AttributeKey;
import org.apache.cassandra.concurrent.DebuggableTask;
import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
import org.apache.cassandra.concurrent.SharedExecutorPool;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
import org.apache.cassandra.transport.CQLMessageHandler;
import org.apache.cassandra.transport.ClientResourceLimits;
import org.apache.cassandra.transport.Envelope;
import org.apache.cassandra.transport.ExceptionHandlers;
import org.apache.cassandra.transport.Flusher;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.ServerConnection;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Dispatcher
implements CQLMessageHandler.MessageConsumer<Message.Request> {
    private static final Logger logger = LoggerFactory.getLogger(Dispatcher.class);
    @VisibleForTesting
    static final LocalAwareExecutorPlus requestExecutor = SharedExecutorPool.SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), DatabaseDescriptor::setNativeTransportMaxThreads, "transport", "Native-Transport-Requests");
    @VisibleForTesting
    static final LocalAwareExecutorPlus authExecutor = SharedExecutorPool.SHARED.newExecutor(Math.max(1, DatabaseDescriptor.getNativeTransportMaxAuthThreads()), DatabaseDescriptor::setNativeTransportMaxAuthThreads, "transport", "Native-Transport-Auth-Requests");
    private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<EventLoop, Flusher>();
    private final boolean useLegacyFlusher;
    static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf((String)"EVTDISP");

    public Dispatcher(boolean useLegacyFlusher) {
        this.useLegacyFlusher = useLegacyFlusher;
    }

    @Override
    public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, ClientResourceLimits.Overload backpressure) {
        boolean isAuthQuery = DatabaseDescriptor.getNativeTransportMaxAuthThreads() > 0 && (request.type == Message.Type.AUTH_RESPONSE || request.type == Message.Type.CREDENTIALS);
        LocalAwareExecutorPlus executor = isAuthQuery ? authExecutor : requestExecutor;
        executor.submit(new RequestProcessor(channel, request, forFlusher, backpressure));
        ClientMetrics.instance.markRequestDispatched();
    }

    @Override
    public boolean hasQueueCapacity() {
        double threshold = DatabaseDescriptor.getNativeTransportQueueMaxItemAgeThreshold();
        if (threshold <= 0.0) {
            return true;
        }
        return (double)requestExecutor.oldestTaskQueueTime() < (double)DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS) * threshold;
    }

    private static Message.Response processRequest(ServerConnection connection, Message.Request request, ClientResourceLimits.Overload backpressure, RequestTime requestTime) {
        long queueTime = requestTime.timeSpentInQueueNanos();
        ClientMetrics.instance.queueTime(queueTime, TimeUnit.NANOSECONDS);
        if (queueTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS)) {
            ClientMetrics.instance.markTimedOutBeforeProcessing();
            return ErrorMessage.fromException(new OverloadedException("Query timed out before it could start"));
        }
        if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) {
            ClientWarn.instance.captureWarnings();
        }
        if (request.isTrackable()) {
            CoordinatorWarnings.init();
        }
        switch (backpressure) {
            case NONE: {
                break;
            }
            case REQUESTS: {
                String message = String.format("Request breached global limit of %d requests/second and triggered backpressure.", ClientResourceLimits.getNativeTransportMaxRequestsPerSecond());
                NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1L, TimeUnit.MINUTES, message, new Object[0]);
                ClientWarn.instance.warn(message);
                break;
            }
            case BYTES_IN_FLIGHT: {
                String message = String.format("Request breached limit(s) on bytes in flight (Endpoint: %d, Global: %d) and triggered backpressure.", ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit());
                NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1L, TimeUnit.MINUTES, message, new Object[0]);
                ClientWarn.instance.warn(message);
                break;
            }
            case QUEUE_TIME: {
                String message = String.format("Request has spent over %s time of the maximum timeout %dms in the queue", DatabaseDescriptor.getNativeTransportQueueMaxItemAgeThreshold(), DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.MILLISECONDS));
                NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1L, TimeUnit.MINUTES, message, new Object[0]);
                ClientWarn.instance.warn(message);
                break;
            }
        }
        QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion());
        Message.logger.trace("Received: {}, v={}", (Object)request, (Object)connection.getVersion());
        connection.requests.inc();
        Message.Response response = request.execute(qstate, requestTime);
        if (request.isTrackable()) {
            CoordinatorWarnings.done();
        }
        response.setStreamId(request.getStreamId());
        response.setWarnings(ClientWarn.instance.getWarnings());
        response.attach(connection);
        connection.applyStateTransition(request.type, response.type);
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static Message.Response processRequest(Channel channel, Message.Request request, ClientResourceLimits.Overload backpressure, RequestTime requestTime) {
        try {
            Message.Response response = Dispatcher.processRequest((ServerConnection)request.connection(), request, backpressure, requestTime);
            return response;
        }
        catch (Throwable t) {
            JVMStabilityInspector.inspectThrowable(t);
            if (request.isTrackable()) {
                CoordinatorWarnings.done();
            }
            Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
            ErrorMessage error = ErrorMessage.fromException(t, handler);
            error.setStreamId(request.getStreamId());
            error.setWarnings(ClientWarn.instance.getWarnings());
            ErrorMessage errorMessage = error;
            return errorMessage;
        }
        finally {
            CoordinatorWarnings.reset();
            ClientWarn.instance.resetWarnings();
        }
    }

    void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, ClientResourceLimits.Overload backpressure, RequestTime requestTime) {
        Message.Response response = Dispatcher.processRequest(channel, request, backpressure, requestTime);
        Flusher.FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
        Message.logger.trace("Responding: {}, v={}", (Object)response, (Object)request.connection().getVersion());
        this.flush(toFlush);
    }

    private void flush(Flusher.FlushItem<?> item) {
        Flusher created;
        Flusher alt;
        EventLoop loop = item.channel.eventLoop();
        Flusher flusher = (Flusher)flusherLookup.get(loop);
        if (flusher == null && (alt = flusherLookup.putIfAbsent(loop, flusher = (created = this.useLegacyFlusher ? Flusher.legacy(loop) : Flusher.immediate(loop)))) != null) {
            flusher = alt;
        }
        flusher.enqueue(item);
        flusher.start();
    }

    public static void shutdown() {
        requestExecutor.shutdown();
        authExecutor.shutdown();
    }

    Consumer<EventMessage> eventDispatcher(Channel channel, ProtocolVersion version, FrameEncoder.PayloadAllocator allocator) {
        return eventMessage -> this.flush(new Flusher.FlushItem.Framed(channel, eventMessage.encode(version), null, allocator, f -> ((Envelope)f.response).release()));
    }

    public class RequestProcessor
    implements DebuggableTask.RunnableDebuggableTask {
        private final Channel channel;
        private final Message.Request request;
        private final FlushItemConverter forFlusher;
        private final ClientResourceLimits.Overload backpressure;
        private volatile long startTimeNanos;

        public RequestProcessor(Channel channel, Message.Request request, FlushItemConverter forFlusher, ClientResourceLimits.Overload backpressure) {
            this.channel = channel;
            this.request = request;
            this.forFlusher = forFlusher;
            this.backpressure = backpressure;
        }

        @Override
        public void run() {
            this.startTimeNanos = MonotonicClock.Global.preciseTime.now();
            Dispatcher.this.processRequest(this.channel, this.request, this.forFlusher, this.backpressure, new RequestTime(this.request.createdAtNanos, this.startTimeNanos));
        }

        @Override
        public long creationTimeNanos() {
            return this.request.createdAtNanos;
        }

        @Override
        public long startTimeNanos() {
            return this.startTimeNanos;
        }

        @Override
        public String description() {
            return this.request.toString();
        }

        public String toString() {
            return "RequestProcessor{request=" + this.request + ", approxStartTimeNanos=" + this.startTimeNanos + '}';
        }
    }

    public static class RequestTime {
        private final long enqueuedAtNanos;
        private final long startedAtNanos;

        public RequestTime(long createdAtNanos) {
            this(createdAtNanos, createdAtNanos);
        }

        public RequestTime(long enqueuedAtNanos, long startedAtNanos) {
            this.enqueuedAtNanos = enqueuedAtNanos;
            this.startedAtNanos = startedAtNanos;
        }

        public static RequestTime forImmediateExecution() {
            return new RequestTime(MonotonicClock.Global.preciseTime.now());
        }

        public long startedAtNanos() {
            return this.startedAtNanos;
        }

        public long enqueuedAtNanos() {
            return this.enqueuedAtNanos;
        }

        public long baseTimeNanos() {
            switch (DatabaseDescriptor.getCQLStartTime()) {
                case REQUEST: {
                    return this.startedAtNanos();
                }
                case QUEUE: {
                    return this.enqueuedAtNanos();
                }
            }
            throw new IllegalArgumentException("Unknown start time: " + (Object)((Object)DatabaseDescriptor.getCQLStartTime()));
        }

        public long computeDeadline(long verbExpiresAfterNanos) {
            long clientDeadline = this.clientDeadline();
            long verbDeadline = this.baseTimeNanos() + verbExpiresAfterNanos;
            return Math.min(verbDeadline, clientDeadline);
        }

        public long computeTimeout(long now, long verbExpiresAfterNanos) {
            return this.computeDeadline(verbExpiresAfterNanos) - now;
        }

        public boolean shouldSendHints() {
            long clientDeadline;
            if (!DatabaseDescriptor.getEnforceNativeDeadlineForHints()) {
                return true;
            }
            long now = MonotonicClock.Global.preciseTime.now();
            return now < (clientDeadline = this.clientDeadline());
        }

        public long clientDeadline() {
            return this.enqueuedAtNanos() + DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS);
        }

        public long timeSpentInQueueNanos() {
            return this.startedAtNanos - this.enqueuedAtNanos;
        }
    }

    static interface FlushItemConverter {
        public Flusher.FlushItem<?> toFlushItem(Channel var1, Message.Request var2, Message.Response var3);
    }
}

