package org.apache.cassandra.net;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.io.FastSerializer;
import org.apache.cassandra.net.io.ISerializer;
import org.apache.cassandra.net.io.ProtocolState;
import org.apache.cassandra.net.io.StartState;
import org.apache.cassandra.net.io.TcpReader;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/cassandra/net/TcpConnection.class */
public class TcpConnection extends SelectionKeyHandler implements Comparable {
    private static Logger logger_ = Logger.getLogger(TcpConnection.class);
    private static ISerializer serializer_ = new FastSerializer();
    private SocketChannel socketChannel_;
    private SelectionKey key_;
    private TcpConnectionManager pool_;
    private boolean isIncoming_;
    private TcpReader tcpReader_;
    private ReadWorkItem readWork_;
    private Queue<ByteBuffer> pendingWrites_;
    private EndPoint localEp_;
    private EndPoint remoteEp_;
    boolean inUse_;
    private boolean bStream_;
    private Lock lock_;
    private Condition condition_;

    /* loaded from: input_file:org/apache/cassandra/net/TcpConnection$ReadWorkItem.class */
    class ReadWorkItem implements Runnable {
        ReadWorkItem() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (TcpConnection.this.tcpReader_ == null) {
                TcpConnection.this.tcpReader_ = new TcpReader(TcpConnection.this);
                StartState socketState = TcpConnection.this.tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
                if (socketState == null) {
                    socketState = new ProtocolState(TcpConnection.this.tcpReader_);
                    TcpConnection.this.tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, socketState);
                }
                TcpConnection.this.tcpReader_.morphState(socketState);
            }
            try {
                try {
                    byte[] bArr = new byte[0];
                    while (true) {
                        byte[] read = TcpConnection.this.tcpReader_.read();
                        if (read.length <= 0) {
                            break;
                        }
                        ProtocolHeader protocolHeader = TcpConnection.this.tcpReader_.getProtocolHeader();
                        if (protocolHeader.isStreamingMode_) {
                            MessagingService.setStreamingMode(false);
                            TcpConnection.this.closeSocket();
                        } else {
                            if (TcpConnection.this.remoteEp_ == null) {
                                TcpConnection.this.remoteEp_ = new EndPoint(TcpConnection.this.socketChannel_.socket().getInetAddress().getHostAddress(), protocolHeader.isListening_ ? DatabaseDescriptor.getStoragePort() : 5555);
                                TcpConnection.this.pool_ = MessagingService.getConnectionPool(TcpConnection.this.localEp_, TcpConnection.this.remoteEp_);
                                TcpConnection.this.pool_.addToPool(TcpConnection.this);
                            }
                            MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(protocolHeader.serializerType_, read));
                            TcpConnection.this.tcpReader_.resetState();
                        }
                    }
                    if (TcpConnection.this.key_.isValid()) {
                        SelectionKeyHandler.turnOnInterestOps(TcpConnection.this.key_, 1);
                    }
                } catch (IOException e) {
                    handleException(e);
                    if (TcpConnection.this.key_.isValid()) {
                        SelectionKeyHandler.turnOnInterestOps(TcpConnection.this.key_, 1);
                    }
                } catch (Throwable th) {
                    handleException(th);
                    if (TcpConnection.this.key_.isValid()) {
                        SelectionKeyHandler.turnOnInterestOps(TcpConnection.this.key_, 1);
                    }
                }
            } catch (Throwable th2) {
                if (TcpConnection.this.key_.isValid()) {
                    SelectionKeyHandler.turnOnInterestOps(TcpConnection.this.key_, 1);
                }
                throw th2;
            }
        }

        private void handleException(Throwable th) {
            TcpConnection.logger_.warn("Problem reading from socket connected to : " + TcpConnection.this.socketChannel_);
            TcpConnection.logger_.warn(LogUtil.throwableToString(th));
            TcpConnection.this.errorClose();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TcpConnection(TcpConnectionManager tcpConnectionManager, EndPoint endPoint, EndPoint endPoint2) throws IOException {
        this.isIncoming_ = false;
        this.readWork_ = new ReadWorkItem();
        this.pendingWrites_ = new ConcurrentLinkedQueue();
        this.inUse_ = false;
        this.bStream_ = false;
        this.socketChannel_ = SocketChannel.open();
        this.socketChannel_.configureBlocking(false);
        this.pool_ = tcpConnectionManager;
        this.localEp_ = endPoint;
        this.remoteEp_ = endPoint2;
        if (this.socketChannel_.connect(this.remoteEp_.getInetAddress())) {
            this.key_ = SelectorManager.getSelectorManager().register(this.socketChannel_, this, 1);
        } else {
            this.key_ = SelectorManager.getSelectorManager().register(this.socketChannel_, this, 8);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TcpConnection(EndPoint endPoint, EndPoint endPoint2) throws IOException {
        this.isIncoming_ = false;
        this.readWork_ = new ReadWorkItem();
        this.pendingWrites_ = new ConcurrentLinkedQueue();
        this.inUse_ = false;
        this.bStream_ = false;
        this.socketChannel_ = SocketChannel.open();
        this.socketChannel_.configureBlocking(false);
        this.localEp_ = endPoint;
        this.remoteEp_ = endPoint2;
        if (this.socketChannel_.connect(this.remoteEp_.getInetAddress())) {
            this.key_ = SelectorManager.getSelectorManager().register(this.socketChannel_, this, 1);
        } else {
            this.key_ = SelectorManager.getSelectorManager().register(this.socketChannel_, this, 8);
        }
        this.bStream_ = true;
        this.lock_ = new ReentrantLock();
        this.condition_ = this.lock_.newCondition();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void acceptConnection(SocketChannel socketChannel, EndPoint endPoint, boolean z) throws IOException {
        new TcpConnection(socketChannel, endPoint, true).registerReadInterest();
    }

    private void registerReadInterest() throws IOException {
        this.key_ = SelectorManager.getSelectorManager().register(this.socketChannel_, this, 1);
    }

    TcpConnection(SocketChannel socketChannel, EndPoint endPoint, boolean z) throws IOException {
        this.isIncoming_ = false;
        this.readWork_ = new ReadWorkItem();
        this.pendingWrites_ = new ConcurrentLinkedQueue();
        this.inUse_ = false;
        this.bStream_ = false;
        this.socketChannel_ = socketChannel;
        this.socketChannel_.configureBlocking(false);
        this.isIncoming_ = z;
        this.localEp_ = endPoint;
    }

    EndPoint getLocalEp() {
        return this.localEp_;
    }

    public void setLocalEp(EndPoint endPoint) {
        this.localEp_ = endPoint;
    }

    public EndPoint getEndPoint() {
        return this.remoteEp_;
    }

    public boolean isIncoming() {
        return this.isIncoming_;
    }

    public SocketChannel getSocketChannel() {
        return this.socketChannel_;
    }

    public void write(Message message) throws IOException {
        byte[] serialize = serializer_.serialize(message);
        if (serialize.length > 0) {
            ByteBuffer packIt = MessagingService.packIt(serialize, false, false, !message.getFrom().equals(EndPoint.sentinelLocalEndPoint_));
            synchronized (this) {
                if (!this.pendingWrites_.isEmpty() || !this.socketChannel_.isConnected()) {
                    this.pendingWrites_.add(packIt);
                    return;
                }
                this.socketChannel_.write(packIt);
                if (packIt.remaining() > 0) {
                    this.pendingWrites_.add(packIt);
                    turnOnInterestOps(this.key_, 4);
                }
            }
        }
    }

    public void stream(File file, long j, long j2) throws IOException, InterruptedException {
        if (!this.bStream_) {
            throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
        }
        this.lock_.lock();
        try {
            long j3 = j2 - j;
            long j4 = 0;
            FileChannel channel = new RandomAccessFile(file, "r").getChannel();
            int i = 0;
            while (!this.socketChannel_.isConnected()) {
                if (i == 3) {
                    throw new IOException("Unable to connect to " + this.remoteEp_ + " after " + i + " attempts.");
                }
                this.condition_.await(2L, TimeUnit.SECONDS);
                i++;
            }
            while (j4 < j3) {
                if (j == 0) {
                    ByteBuffer constructStreamHeader = MessagingService.constructStreamHeader(false, true);
                    this.socketChannel_.write(constructStreamHeader);
                    if (constructStreamHeader.remaining() > 0) {
                        this.pendingWrites_.add(constructStreamHeader);
                        turnOnInterestOps(this.key_, 4);
                        this.condition_.await();
                    }
                }
                long transferTo = channel.transferTo(j, 67108864, this.socketChannel_);
                if (logger_.isDebugEnabled()) {
                    logger_.debug("Bytes transferred " + transferTo);
                }
                j4 += transferTo;
                j += transferTo;
                if (transferTo < 67108864 && j4 != j3) {
                    turnOnInterestOps(this.key_, 4);
                    this.condition_.await();
                }
            }
        } finally {
            this.lock_.unlock();
        }
    }

    private void resumeStreaming() {
        if (this.bStream_) {
            this.lock_.lock();
            try {
                this.condition_.signal();
                this.lock_.unlock();
            } catch (Throwable th) {
                this.lock_.unlock();
                throw th;
            }
        }
    }

    public void close() {
        this.inUse_ = false;
        if (this.pool_.contains(this)) {
            this.pool_.decUsed();
        }
    }

    public boolean isConnected() {
        return this.socketChannel_.isConnected();
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof TcpConnection)) {
            return false;
        }
        TcpConnection tcpConnection = (TcpConnection) obj;
        return this.localEp_.equals(tcpConnection.localEp_) && this.remoteEp_.equals(tcpConnection.remoteEp_);
    }

    public int hashCode() {
        return (this.localEp_ + ":" + this.remoteEp_).hashCode();
    }

    public String toString() {
        return this.socketChannel_.toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeSocket() {
        logger_.warn("Closing down connection " + this.socketChannel_ + " with " + this.pendingWrites_.size() + " writes remaining.");
        if (this.pool_ != null) {
            this.pool_.removeConnection(this);
        }
        cancel(this.key_);
        this.pendingWrites_.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void errorClose() {
        logger_.warn("Closing down connection " + this.socketChannel_);
        this.pendingWrites_.clear();
        cancel(this.key_);
        this.pendingWrites_.clear();
        if (this.pool_ != null) {
            this.pool_.removeConnection(this);
        }
    }

    private void cancel(SelectionKey selectionKey) {
        if (selectionKey != null) {
            selectionKey.cancel();
            try {
                selectionKey.channel().close();
            } catch (IOException e) {
            }
        }
    }

    @Override // org.apache.cassandra.net.SelectionKeyHandler
    public void connect(SelectionKey selectionKey) {
        turnOffInterestOps(selectionKey, 8);
        try {
            if (this.socketChannel_.finishConnect()) {
                turnOnInterestOps(selectionKey, 1);
                synchronized (this) {
                    if (!this.pendingWrites_.isEmpty()) {
                        turnOnInterestOps(this.key_, 4);
                    }
                }
                resumeStreaming();
            } else {
                logger_.error("Closing connection because socket channel could not finishConnect.");
                errorClose();
            }
        } catch (IOException e) {
            logger_.error("Encountered IOException on connection: " + this.socketChannel_, e);
            errorClose();
        }
    }

    @Override // org.apache.cassandra.net.SelectionKeyHandler
    public void write(SelectionKey selectionKey) {
        turnOffInterestOps(selectionKey, 4);
        doPendingWrites();
        resumeStreaming();
    }

    /* JADX WARN: Finally extract failed */
    public void doPendingWrites() {
        synchronized (this) {
            while (!this.pendingWrites_.isEmpty()) {
                try {
                    try {
                        ByteBuffer peek = this.pendingWrites_.peek();
                        this.socketChannel_.write(peek);
                        if (peek.remaining() > 0) {
                            break;
                        } else {
                            this.pendingWrites_.remove();
                        }
                    } catch (IOException e) {
                        logger_.error(LogUtil.throwableToString(e));
                        errorClose();
                        if (!this.pendingWrites_.isEmpty()) {
                            turnOnInterestOps(this.key_, 4);
                        }
                    }
                } catch (Throwable th) {
                    if (!this.pendingWrites_.isEmpty()) {
                        turnOnInterestOps(this.key_, 4);
                    }
                    throw th;
                }
            }
            if (!this.pendingWrites_.isEmpty()) {
                turnOnInterestOps(this.key_, 4);
            }
        }
    }

    @Override // org.apache.cassandra.net.SelectionKeyHandler
    public void read(SelectionKey selectionKey) {
        turnOffInterestOps(selectionKey, 1);
        MessagingService.getReadExecutor().execute(this.readWork_);
    }

    public int pending() {
        return this.pendingWrites_.size();
    }

    @Override // java.lang.Comparable
    public int compareTo(Object obj) {
        if (obj instanceof TcpConnection) {
            return this.pendingWrites_.size() - ((TcpConnection) obj).pendingWrites_.size();
        }
        throw new IllegalArgumentException();
    }
}
