/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.LogManager;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Logger;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.Version;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.breaker.CircuitBreaker;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.bytes.BytesReference;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.collect.MapBuilder;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.io.stream.StreamInput;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.metrics.MeanMetric;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.transport.TransportAddress;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ThreadContext;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.ThreadPool;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.ActionNotFoundTransportException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.InboundMessage;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.OutboundHandler;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.RemoteTransportException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.RequestHandlerRegistry;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TcpChannel;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TcpTransportChannel;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.Transport;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportChannel;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportHandshaker;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportKeepAlive;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportLogger;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportMessageListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportResponseHandler;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportSerializationException;

public class InboundHandler {
    private static final Logger logger = LogManager.getLogger(InboundHandler.class);
    private final MeanMetric readBytesMetric = new MeanMetric();
    private final ThreadPool threadPool;
    private final OutboundHandler outboundHandler;
    private final CircuitBreakerService circuitBreakerService;
    private final InboundMessage.Reader reader;
    private final TransportLogger transportLogger;
    private final TransportHandshaker handshaker;
    private final TransportKeepAlive keepAlive;
    private final Transport.ResponseHandlers responseHandlers = new Transport.ResponseHandlers();
    private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
    private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

    InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, InboundMessage.Reader reader, CircuitBreakerService circuitBreakerService, TransportLogger transportLogger, TransportHandshaker handshaker, TransportKeepAlive keepAlive) {
        this.threadPool = threadPool;
        this.outboundHandler = outboundHandler;
        this.circuitBreakerService = circuitBreakerService;
        this.reader = reader;
        this.transportLogger = transportLogger;
        this.handshaker = handshaker;
        this.keepAlive = keepAlive;
    }

    synchronized <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
        if (this.requestHandlers.containsKey(reg.getAction())) {
            throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
        }
        this.requestHandlers = MapBuilder.newMapBuilder(this.requestHandlers).put(reg.getAction(), reg).immutableMap();
    }

    final RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
        return this.requestHandlers.get(action);
    }

    final Transport.ResponseHandlers getResponseHandlers() {
        return this.responseHandlers;
    }

    MeanMetric getReadBytes() {
        return this.readBytesMetric;
    }

    void setMessageListener(TransportMessageListener listener) {
        if (this.messageListener != TransportMessageListener.NOOP_LISTENER) {
            throw new IllegalStateException("Cannot set message listener twice");
        }
        this.messageListener = listener;
    }

    void inboundMessage(TcpChannel channel, BytesReference message) throws Exception {
        channel.getChannelStats().markAccessed(this.threadPool.relativeTimeInMillis());
        this.transportLogger.logInboundMessage(channel, message);
        this.readBytesMetric.inc(message.length() + 2 + 4);
        if (message.length() != 0) {
            this.messageReceived(message, channel);
        } else {
            this.keepAlive.receiveKeepAlive(channel);
        }
    }

    private void messageReceived(BytesReference reference, TcpChannel channel) throws IOException {
        InetSocketAddress remoteAddress = channel.getRemoteAddress();
        ThreadContext threadContext = this.threadPool.getThreadContext();
        try (ThreadContext.StoredContext existing = threadContext.stashContext();
             InboundMessage message = this.reader.deserialize(reference);){
            message.getStoredContext().restore();
            threadContext.putTransient("_remote_address", remoteAddress);
            if (message.isRequest()) {
                this.handleRequest(channel, (InboundMessage.Request)message, reference.length());
            } else {
                TransportResponseHandler<? extends TransportResponse> theHandler;
                long requestId = message.getRequestId();
                TransportResponseHandler<TransportResponse> handler = message.isHandshake() ? this.handshaker.removeHandlerForHandshake(requestId) : ((theHandler = this.responseHandlers.onResponseReceived(requestId, this.messageListener)) == null && message.isError() ? this.handshaker.removeHandlerForHandshake(requestId) : theHandler);
                if (handler != null) {
                    if (message.isError()) {
                        this.handlerResponseError(message.getStreamInput(), handler);
                    } else {
                        this.handleResponse(remoteAddress, message.getStreamInput(), handler);
                    }
                    int nextByte = message.getStreamInput().read();
                    if (nextByte != -1) {
                        throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" + handler + "], error [" + message.isError() + "]; resetting");
                    }
                }
            }
        }
    }

    private void handleRequest(TcpChannel channel, InboundMessage.Request message, int messageLengthBytes) {
        Set<String> features = message.getFeatures();
        String action = message.getActionName();
        long requestId = message.getRequestId();
        StreamInput stream = message.getStreamInput();
        Version version = message.getVersion();
        this.messageListener.onRequestReceived(requestId, action);
        TransportChannel transportChannel = null;
        try {
            if (message.isHandshake()) {
                this.handshaker.handleHandshake(version, features, channel, requestId, stream);
            } else {
                RequestHandlerRegistry<? extends TransportRequest> reg = this.getRequestHandler(action);
                if (reg == null) {
                    throw new ActionNotFoundTransportException(action);
                }
                CircuitBreaker breaker = this.circuitBreakerService.getBreaker("in_flight_requests");
                if (reg.canTripCircuitBreaker()) {
                    breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
                } else {
                    breaker.addWithoutBreaking(messageLengthBytes);
                }
                transportChannel = new TcpTransportChannel(this.outboundHandler, channel, action, requestId, version, features, this.circuitBreakerService, messageLengthBytes, message.isCompress());
                TransportRequest request = reg.newRequest(stream);
                request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
                int nextByte = stream.read();
                if (nextByte != -1) {
                    throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + "], available [" + stream.available() + "]; resetting");
                }
                this.threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
            }
        }
        catch (Exception e) {
            if (transportChannel == null) {
                transportChannel = new TcpTransportChannel(this.outboundHandler, channel, action, requestId, version, features, this.circuitBreakerService, 0L, message.isCompress());
            }
            try {
                transportChannel.sendResponse(e);
            }
            catch (IOException inner) {
                inner.addSuppressed(e);
                logger.warn(() -> new ParameterizedMessage("Failed to send error message back to client for action [{}]", (Object)action), (Throwable)inner);
            }
        }
    }

    private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, StreamInput stream, final TransportResponseHandler<T> handler) {
        TransportResponse response;
        try {
            response = (TransportResponse)handler.read(stream);
            response.remoteAddress(new TransportAddress(remoteAddress));
        }
        catch (Exception e) {
            this.handleException(handler, new TransportSerializationException("Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
            return;
        }
        this.threadPool.executor(handler.executor()).execute(new AbstractRunnable(){

            @Override
            public void onFailure(Exception e) {
                InboundHandler.this.handleException(handler, new ResponseHandlerFailureTransportException(e));
            }

            @Override
            protected void doRun() {
                handler.handleResponse(response);
            }
        });
    }

    private void handlerResponseError(StreamInput stream, TransportResponseHandler handler) {
        Object error;
        try {
            error = stream.readException();
        }
        catch (Exception e) {
            error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
        }
        this.handleException(handler, (Throwable)error);
    }

    private void handleException(TransportResponseHandler handler, Throwable error) {
        if (!(error instanceof RemoteTransportException)) {
            error = new RemoteTransportException(error.getMessage(), error);
        }
        RemoteTransportException rtx = (RemoteTransportException)error;
        this.threadPool.executor(handler.executor()).execute(() -> {
            try {
                handler.handleException(rtx);
            }
            catch (Exception e) {
                logger.error(() -> new ParameterizedMessage("failed to handle exception response [{}]", (Object)handler), (Throwable)e);
            }
        });
    }

    private static class RequestHandler
    extends AbstractRunnable {
        private final RequestHandlerRegistry reg;
        private final TransportRequest request;
        private final TransportChannel transportChannel;

        RequestHandler(RequestHandlerRegistry reg, TransportRequest request, TransportChannel transportChannel) {
            this.reg = reg;
            this.request = request;
            this.transportChannel = transportChannel;
        }

        @Override
        protected void doRun() throws Exception {
            this.reg.processMessageReceived(this.request, this.transportChannel);
        }

        @Override
        public boolean isForceExecution() {
            return this.reg.isForceExecution();
        }

        @Override
        public void onFailure(Exception e) {
            try {
                this.transportChannel.sendResponse(e);
            }
            catch (Exception inner) {
                inner.addSuppressed(e);
                logger.warn(() -> new ParameterizedMessage("Failed to send error message back to client for action [{}]", (Object)this.reg.getAction()), (Throwable)inner);
            }
        }
    }
}

