package org.apache.cassandra.net;

import java.io.IOException;
import java.net.MulticastSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.IStage;
import org.apache.cassandra.concurrent.MultiThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.utils.Cachetable;
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.cassandra.utils.HashingSchemes;
import org.apache.cassandra.utils.ICachetable;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/cassandra/net/MessagingService.class */
public class MessagingService implements IMessagingService {
    public static final String responseVerbHandler_ = "RESPONSE";
    public static final String responseStage_ = "RESPONSE-STAGE";
    private static ICachetable<String, IAsyncCallback> callbackMap_;
    private static ICachetable<String, IAsyncResult> taskCompletionMap_;
    private static Set<EndPoint> endPoints_;
    private static Map<String, IVerbHandler> verbHandlers_;
    private static ExecutorService messageDeserializationExecutor_;
    private static ExecutorService messageSerializerExecutor_;
    private static ExecutorService messageDeserializerExecutor_;
    private static ExecutorService streamExecutor_;
    private static boolean debugOn_ = false;
    private static int version_ = 1;
    private static SerializerType serializerType_ = SerializerType.BINARY;
    private static byte[] protocol_ = new byte[16];
    private static Map<String, String> reservedVerbs_ = new Hashtable();
    private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
    private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap();
    private static Map<String, MulticastSocket> mCastMembership_ = new HashMap();
    private static final ReentrantLock lock_ = new ReentrantLock();
    private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable();
    private static boolean bShutdown_ = false;
    private static Logger logger_ = Logger.getLogger(MessagingService.class);
    private static IMessagingService messagingService_ = new MessagingService();

    /* loaded from: input_file:org/apache/cassandra/net/MessagingService$ReservedVerbs_.class */
    private enum ReservedVerbs_ {
        RESPONSE
    }

    public static boolean isDebugOn() {
        return debugOn_;
    }

    public static void debugOn(boolean z) {
        debugOn_ = z;
    }

    public static SerializerType getSerializerType() {
        return serializerType_;
    }

    public static synchronized void serializerType(String str) {
        if (str.equalsIgnoreCase("binary")) {
            serializerType_ = SerializerType.BINARY;
        } else if (str.equalsIgnoreCase("java")) {
            serializerType_ = SerializerType.JAVA;
        } else if (str.equalsIgnoreCase("xml")) {
            serializerType_ = SerializerType.XML;
        }
    }

    public static int getVersion() {
        return version_;
    }

    public static void setVersion(int i) {
        version_ = i;
    }

    public static IMessagingService getMessagingInstance() {
        if (bShutdown_) {
            lock_.lock();
            try {
                if (bShutdown_) {
                    messagingService_ = new MessagingService();
                    bShutdown_ = false;
                }
                lock_.unlock();
            } catch (Throwable th) {
                lock_.unlock();
                throw th;
            }
        }
        return messagingService_;
    }

    public Object clone() throws CloneNotSupportedException {
        throw new CloneNotSupportedException();
    }

    protected MessagingService() {
        for (ReservedVerbs_ reservedVerbs_2 : ReservedVerbs_.values()) {
            reservedVerbs_.put(reservedVerbs_2.toString(), reservedVerbs_2.toString());
        }
        verbHandlers_ = new HashMap();
        endPoints_ = new HashSet();
        int messagingThreadCount = MessagingConfig.getMessagingThreadCount();
        callbackMap_ = new Cachetable(2 * DatabaseDescriptor.getRpcTimeout());
        taskCompletionMap_ = new Cachetable(2 * DatabaseDescriptor.getRpcTimeout());
        messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor(messagingThreadCount, messagingThreadCount, 2147483647L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("MESSAGING-SERVICE-POOL"));
        messageSerializerExecutor_ = new DebuggableThreadPoolExecutor(messagingThreadCount, messagingThreadCount, 2147483647L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL"));
        messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor(messagingThreadCount, messagingThreadCount, 2147483647L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL"));
        streamExecutor_ = new DebuggableThreadPoolExecutor("MESSAGE-STREAMING-POOL");
        protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
        registerVerbHandlers(responseVerbHandler_, new ResponseVerbHandler());
        StageManager.registerStage(responseStage_, new MultiThreadedStage(responseStage_, messagingThreadCount));
    }

    public byte[] hash(String str, byte[] bArr) {
        byte[] bArr2 = null;
        try {
            bArr2 = MessageDigest.getInstance(str).digest(bArr);
        } catch (Exception e) {
            if (logger_.isDebugEnabled()) {
                logger_.debug(LogUtil.throwableToString(e));
            }
        }
        return bArr2;
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void listen(EndPoint endPoint) throws IOException {
        ServerSocketChannel open = ServerSocketChannel.open();
        open.socket().bind(endPoint.getInetAddress());
        open.configureBlocking(false);
        SelectionKey register = SelectorManager.getSelectorManager().register(open, new TcpConnectionHandler(endPoint), 16);
        endPoints_.add(endPoint);
        listenSockets_.put(endPoint, register);
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void listenUDP(EndPoint endPoint) {
        UdpConnection udpConnection = new UdpConnection();
        if (logger_.isDebugEnabled()) {
            logger_.debug("Starting to listen on " + endPoint);
        }
        try {
            udpConnection.init(endPoint.getPort());
            endPoints_.add(endPoint);
        } catch (IOException e) {
            logger_.warn(LogUtil.throwableToString(e));
        }
    }

    public static TcpConnectionManager getConnectionPool(EndPoint endPoint, EndPoint endPoint2) {
        String str = endPoint + ":" + endPoint2;
        TcpConnectionManager tcpConnectionManager = poolTable_.get(str);
        if (tcpConnectionManager == null) {
            lock_.lock();
            try {
                tcpConnectionManager = poolTable_.get(str);
                if (tcpConnectionManager == null) {
                    tcpConnectionManager = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(), MessagingConfig.getConnectionPoolGrowthFactor(), MessagingConfig.getConnectionPoolMaxSize(), endPoint, endPoint2);
                    poolTable_.put(str, tcpConnectionManager);
                }
                lock_.unlock();
            } catch (Throwable th) {
                lock_.unlock();
                throw th;
            }
        }
        return tcpConnectionManager;
    }

    public static ConnectionStatistics[] getPoolStatistics() {
        HashSet hashSet = new HashSet();
        for (TcpConnectionManager tcpConnectionManager : poolTable_.values()) {
            hashSet.add(new ConnectionStatistics(tcpConnectionManager.getLocalEndPoint(), tcpConnectionManager.getRemoteEndPoint(), tcpConnectionManager.getPoolSize(), tcpConnectionManager.getConnectionsInUse()));
        }
        return (ConnectionStatistics[]) hashSet.toArray(new ConnectionStatistics[0]);
    }

    public static TcpConnection getConnection(EndPoint endPoint, EndPoint endPoint2) throws IOException {
        return getConnectionPool(endPoint, endPoint2).getConnection();
    }

    private void checkForReservedVerb(String str) {
        if (reservedVerbs_.get(str) != null && verbHandlers_.get(str) != null) {
            throw new IllegalArgumentException(str + " is a reserved verb handler. Scram!");
        }
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void registerVerbHandlers(String str, IVerbHandler iVerbHandler) {
        checkForReservedVerb(str);
        verbHandlers_.put(str, iVerbHandler);
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void deregisterAllVerbHandlers(EndPoint endPoint) {
        Iterator<String> it = verbHandlers_.keySet().iterator();
        while (it.hasNext()) {
            if (it.next().contains(endPoint.toString())) {
                it.remove();
            }
        }
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void deregisterVerbHandlers(String str) {
        verbHandlers_.remove(str);
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public IVerbHandler getVerbHandler(String str) {
        return verbHandlers_.get(str);
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public String sendRR(Message message, EndPoint[] endPointArr, IAsyncCallback iAsyncCallback) {
        String messageId = message.getMessageId();
        callbackMap_.put(messageId, iAsyncCallback);
        for (EndPoint endPoint : endPointArr) {
            sendOneWay(message, endPoint);
        }
        return messageId;
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public String sendRR(Message message, EndPoint endPoint, IAsyncCallback iAsyncCallback) {
        String messageId = message.getMessageId();
        callbackMap_.put(messageId, iAsyncCallback);
        sendOneWay(message, endPoint);
        return messageId;
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public String sendRR(Message[] messageArr, EndPoint[] endPointArr, IAsyncCallback iAsyncCallback) {
        if (messageArr.length != endPointArr.length) {
            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
        }
        String guid = GuidGenerator.guid();
        callbackMap_.put(guid, iAsyncCallback);
        for (int i = 0; i < messageArr.length; i++) {
            messageArr[i].setMessageId(guid);
            sendOneWay(messageArr[i], endPointArr[i]);
        }
        return guid;
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public String sendRR(Message[][] messageArr, EndPoint[][] endPointArr, IAsyncCallback iAsyncCallback) {
        if (messageArr.length != endPointArr.length) {
            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
        }
        int length = messageArr.length;
        String[] strArr = new String[length];
        for (int i = 0; i < length; i++) {
            strArr[i] = GuidGenerator.guid();
        }
        iAsyncCallback.attachContext(strArr);
        for (int i2 = 0; i2 < length; i2++) {
            callbackMap_.put(strArr[i2], iAsyncCallback);
            for (int i3 = 0; i3 < messageArr[i2].length; i3++) {
                messageArr[i2][i3].setMessageId(strArr[i2]);
                sendOneWay(messageArr[i2][i3], endPointArr[i2][i3]);
            }
        }
        return strArr[0];
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void sendOneWay(Message message, EndPoint endPoint) {
        if (message.getFrom().equals(endPoint)) {
            receive(message);
        } else {
            messageSerializerExecutor_.execute(new MessageSerializationTask(message, endPoint));
        }
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public IAsyncResult sendRR(Message message, EndPoint endPoint) {
        AsyncResult asyncResult = new AsyncResult();
        taskCompletionMap_.put(message.getMessageId(), asyncResult);
        sendOneWay(message, endPoint);
        return asyncResult;
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void sendUdpOneWay(Message message, EndPoint endPoint) {
        if (message.getFrom().equals(endPoint)) {
            receive(message);
            return;
        }
        UdpConnection udpConnection = null;
        try {
            try {
                udpConnection = new UdpConnection();
                udpConnection.init();
                udpConnection.write(message, endPoint);
                if (udpConnection != null) {
                    udpConnection.close();
                }
            } catch (IOException e) {
                logger_.warn(LogUtil.throwableToString(e));
                if (udpConnection != null) {
                    udpConnection.close();
                }
            }
        } catch (Throwable th) {
            if (udpConnection != null) {
                udpConnection.close();
            }
            throw th;
        }
    }

    @Override // org.apache.cassandra.net.IMessagingService
    public void stream(String str, long j, long j2, EndPoint endPoint, EndPoint endPoint2) {
        isStreaming_.set(true);
        streamExecutor_.execute(new FileStreamTask(str, j, j2, endPoint, endPoint2));
    }

    public static boolean isStreaming() {
        return isStreaming_.get();
    }

    public static void setStreamingMode(boolean z) {
        isStreaming_.set(z);
    }

    public static void flushAndshutdown() {
        Iterator<Map.Entry<String, TcpConnectionManager>> it = poolTable_.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<TcpConnection> it2 = it.next().getValue().getConnections().iterator();
            while (it2.hasNext()) {
                it2.next().doPendingWrites();
            }
        }
        shutdown();
    }

    public static void shutdown() {
        logger_.info("Shutting down ...");
        synchronized (MessagingService.class) {
            for (SelectionKey selectionKey : listenSockets_.values()) {
                selectionKey.cancel();
                try {
                    selectionKey.channel().close();
                } catch (IOException e) {
                }
            }
            listenSockets_.clear();
            messageDeserializationExecutor_.shutdownNow();
            messageSerializerExecutor_.shutdownNow();
            messageDeserializerExecutor_.shutdownNow();
            streamExecutor_.shutdownNow();
            taskCompletionMap_.shutdown();
            callbackMap_.shutdown();
            SelectorManager.getSelectorManager().interrupt();
            poolTable_.clear();
            verbHandlers_.clear();
            bShutdown_ = true;
        }
        logger_.info("Shutdown invocation complete.");
    }

    public static void receive(Message message) {
        enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
    }

    public static boolean isLocalEndPoint(EndPoint endPoint) {
        return endPoints_.contains(endPoint);
    }

    private static void enqueueRunnable(String str, Runnable runnable) {
        IStage stage = StageManager.getStage(str);
        if (stage != null) {
            stage.execute(runnable);
        } else {
            logger_.info("Running on default stage - beware");
            messageSerializerExecutor_.execute(runnable);
        }
    }

    public static IAsyncCallback getRegisteredCallback(String str) {
        return callbackMap_.get(str);
    }

    public static void removeRegisteredCallback(String str) {
        callbackMap_.remove(str);
    }

    public static IAsyncResult getAsyncResult(String str) {
        return taskCompletionMap_.remove(str);
    }

    public static void removeAsyncResult(String str) {
        taskCompletionMap_.remove(str);
    }

    public static byte[] getProtocol() {
        return protocol_;
    }

    public static ExecutorService getReadExecutor() {
        return messageDeserializationExecutor_;
    }

    public static ExecutorService getWriteExecutor() {
        return messageSerializerExecutor_;
    }

    public static ExecutorService getDeserializationExecutor() {
        return messageDeserializerExecutor_;
    }

    public static boolean isProtocolValid(byte[] bArr) {
        return isEqual(protocol_, bArr);
    }

    public static boolean isEqual(byte[] bArr, byte[] bArr2) {
        return MessageDigest.isEqual(bArr, bArr2);
    }

    public static byte[] toByteArray(int i) {
        return new byte[]{(byte) ((i >>> 24) & 255), (byte) ((i >>> 16) & 255), (byte) ((i >>> 8) & 255), (byte) (i & 255)};
    }

    public static byte[] toByteArray(short s) {
        return new byte[]{(byte) ((s >>> 8) & 255), (byte) (s & 255)};
    }

    public static short byteArrayToShort(byte[] bArr) {
        return byteArrayToShort(bArr, 0);
    }

    public static short byteArrayToShort(byte[] bArr, int i) {
        if (bArr.length - i < 2) {
            throw new IllegalArgumentException("A short must be 2 bytes in size.");
        }
        short s = 0;
        for (int i2 = 0; i2 < 2; i2++) {
            s = (short) (((short) (s << 8)) | (bArr[i + i2] & 255));
        }
        return s;
    }

    public static int getBits(int i, int i2, int i3) {
        return (i >>> ((i2 + 1) - i3)) & (((-1) << i3) ^ (-1));
    }

    public static int byteArrayToInt(byte[] bArr) {
        return byteArrayToInt(bArr, 0);
    }

    public static int byteArrayToInt(byte[] bArr, int i) {
        if (bArr.length - i < 4) {
            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 4; i3++) {
            i2 = (i2 << 8) | (bArr[i + i3] & 255);
        }
        return i2;
    }

    public static ByteBuffer packIt(byte[] bArr, boolean z, boolean z2, boolean z3) {
        byte[] byteArray = toByteArray(bArr.length);
        int ordinal = 0 | serializerType_.ordinal();
        if (z) {
            ordinal |= 4;
        }
        if (z2) {
            ordinal |= 8;
        }
        if (z3) {
            ordinal |= 16;
        }
        byte[] byteArray2 = toByteArray(ordinal | (version_ << 8));
        ByteBuffer allocate = ByteBuffer.allocate(16 + byteArray2.length + byteArray.length + bArr.length);
        allocate.put(protocol_);
        allocate.put(byteArray2);
        allocate.put(byteArray);
        allocate.put(bArr);
        allocate.flip();
        return allocate;
    }

    public static ByteBuffer constructStreamHeader(boolean z, boolean z2) {
        int ordinal = 0 | serializerType_.ordinal();
        if (z) {
            ordinal |= 4;
        }
        if (z2) {
            ordinal |= 8;
        }
        byte[] byteArray = toByteArray(ordinal | (version_ << 8));
        ByteBuffer allocate = ByteBuffer.allocate(16 + byteArray.length);
        allocate.put(protocol_);
        allocate.put(byteArray);
        allocate.flip();
        return allocate;
    }
}
