package org.apache.htrace.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import org.apache.htrace.core.Span;
import org.apache.htrace.shaded.commons.logging.Log;
import org.apache.htrace.shaded.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/htrace/impl/PackedBufferManager.class */
class PackedBufferManager implements BufferManager {
    private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
    private static final int MAX_PREQUEL_LENGTH = 2048;
    private static final int METHOD_ID_WRITE_SPANS = 1;
    private final Conf conf;
    private final PackedBuffer spans;
    private int numSpans;
    private final ByteBuffer frameBuffer = ByteBuffer.allocate(20);
    private final PackedBuffer prequel = new PackedBuffer(ByteBuffer.allocate(MAX_PREQUEL_LENGTH));
    private final Selector selector = SelectorProvider.provider().openSelector();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PackedBufferManager(Conf conf) throws IOException {
        this.conf = conf;
        this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize));
        clear();
    }

    @Override // org.apache.htrace.impl.BufferManager
    public void writeSpan(Span span) throws IOException {
        this.spans.writeSpan(span);
        this.numSpans++;
        if (LOG.isTraceEnabled()) {
            LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " + this.conf.endpointStr + ". numSpans = " + this.numSpans + ", buffer position = " + this.spans.getBuffer().position());
        }
    }

    @Override // org.apache.htrace.impl.BufferManager
    public int contentLength() {
        return this.spans.getBuffer().position();
    }

    @Override // org.apache.htrace.impl.BufferManager
    public int getNumberOfSpans() {
        return this.numSpans;
    }

    @Override // org.apache.htrace.impl.BufferManager
    public void prepare() throws IOException {
        this.prequel.beginWriteSpansRequest(null, this.numSpans);
        long position = this.prequel.getBuffer().position() + this.spans.getBuffer().position();
        if (position > 67108864) {
            throw new IOException("Can't send RPC of " + position + " bytes because it is longer than 67108864");
        }
        PackedBuffer.writeReqFrame(this.frameBuffer, 1, 1L, (int) position);
        this.frameBuffer.flip();
        this.prequel.getBuffer().flip();
        this.spans.getBuffer().flip();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Preparing to send RPC of length " + (position + 20) + " to " + this.conf.endpointStr + ", containing " + this.numSpans + " spans.");
        }
    }

    @Override // org.apache.htrace.impl.BufferManager
    public void flush() throws IOException {
        SelectionKey selectionKey = null;
        IOException iOException = null;
        this.frameBuffer.position(0);
        this.prequel.getBuffer().position(0);
        this.spans.getBuffer().position(0);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Preparing to flush " + this.numSpans + " spans to " + this.conf.endpointStr);
        }
        try {
            try {
                selectionKey = doConnect();
                doSend(selectionKey, new ByteBuffer[]{this.frameBuffer, this.prequel.getBuffer(), this.spans.getBuffer()});
                readAndValidateResponseFrame(selectionKey, this.prequel.getBuffer(), 1L, 1);
                if (selectionKey != null) {
                    selectionKey.cancel();
                    try {
                        ((SocketChannel) selectionKey.attachment()).close();
                    } catch (IOException e) {
                        if (0 != 0) {
                            iOException.addSuppressed(e);
                        }
                    }
                }
            } catch (IOException e2) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got exception during flush", e2);
                }
                iOException = e2;
                if (selectionKey != null) {
                    selectionKey.cancel();
                    try {
                        ((SocketChannel) selectionKey.attachment()).close();
                    } catch (IOException e3) {
                        if (iOException != null) {
                            iOException.addSuppressed(e3);
                        }
                    }
                }
            }
            if (iOException != null) {
                throw iOException;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Successfully flushed " + this.numSpans + " spans to " + this.conf.endpointStr);
            }
        } catch (Throwable th) {
            if (selectionKey != null) {
                selectionKey.cancel();
                try {
                    ((SocketChannel) selectionKey.attachment()).close();
                } catch (IOException e4) {
                    if (0 != 0) {
                        iOException.addSuppressed(e4);
                    }
                }
            }
            throw th;
        }
    }

    private long updateRemainingMs(long j, long j2) {
        long deltaMs = TimeUtil.deltaMs(j, TimeUtil.nowMs());
        if (deltaMs > j2) {
            return 0L;
        }
        return j2 - deltaMs;
    }

    private SelectionKey doConnect() throws IOException {
        SocketChannel open = SocketChannel.open();
        SelectionKey selectionKey = null;
        try {
            if (open.isBlocking()) {
                open.configureBlocking(false);
            }
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.conf.endpoint.getHostString(), this.conf.endpoint.getPort());
            inetSocketAddress.getHostName();
            open.connect(inetSocketAddress);
            SelectionKey register = open.register(this.selector, 8, open);
            long nowMs = TimeUtil.nowMs();
            long j = this.conf.connectTimeoutMs;
            do {
                this.selector.select(j);
                for (SelectionKey selectionKey2 : this.selector.keys()) {
                    if (selectionKey2.isConnectable()) {
                        ((SocketChannel) selectionKey2.attachment()).finishConnect();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Successfully connected to " + this.conf.endpointStr + ".");
                        }
                        if (1 == 0) {
                            if (register != null) {
                                register.cancel();
                            }
                            open.close();
                        }
                        return register;
                    }
                }
                j = updateRemainingMs(nowMs, this.conf.connectTimeoutMs);
            } while (j != 0);
            throw new IOException("Attempt to connect to " + this.conf.endpointStr + " timed out after " + TimeUtil.deltaMs(nowMs, TimeUtil.nowMs()) + " ms.");
        } catch (Throwable th) {
            if (0 == 0) {
                if (0 != 0) {
                    selectionKey.cancel();
                }
                open.close();
            }
            throw th;
        }
    }

    private void doSend(SelectionKey selectionKey, ByteBuffer[] byteBufferArr) throws IOException {
        long j = 0;
        selectionKey.interestOps(4);
        SocketChannel socketChannel = (SocketChannel) selectionKey.attachment();
        long nowMs = TimeUtil.nowMs();
        long j2 = this.conf.ioTimeoutMs;
        do {
            this.selector.select(j2);
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                if (it.next().isWritable()) {
                    long write = socketChannel.write(byteBufferArr, 0, byteBufferArr.length - 0);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Sent " + write + " bytes to " + this.conf.endpointStr);
                    }
                    j += write;
                }
            }
            for (int i = 0; i != byteBufferArr.length; i++) {
                if (byteBufferArr[i].remaining() > 0) {
                    j2 = updateRemainingMs(nowMs, this.conf.ioTimeoutMs);
                }
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Finished sending " + j + " bytes to " + this.conf.endpointStr);
                return;
            }
            return;
        } while (j2 != 0);
        throw new IOException("Attempt to write to " + this.conf.endpointStr + " timed out after " + TimeUtil.deltaMs(nowMs, TimeUtil.nowMs()) + " ms.");
    }

    private void doRecv(SelectionKey selectionKey, ByteBuffer byteBuffer) throws IOException {
        selectionKey.interestOps(1);
        SocketChannel socketChannel = (SocketChannel) selectionKey.attachment();
        int remaining = byteBuffer.remaining();
        long nowMs = TimeUtil.nowMs();
        long j = this.conf.ioTimeoutMs;
        while (j > 0) {
            this.selector.select(j);
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                if (it.next().isReadable()) {
                    socketChannel.read(byteBuffer);
                }
            }
            if (byteBuffer.remaining() == 0) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Received all " + remaining + " bytes from " + this.conf.endpointStr);
                    return;
                }
                return;
            } else {
                j = updateRemainingMs(nowMs, this.conf.ioTimeoutMs);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Received " + (remaining - byteBuffer.remaining()) + " out of " + remaining + " bytes from " + this.conf.endpointStr);
                }
                if (j == 0) {
                    throw new IOException("Attempt to write to " + this.conf.endpointStr + " timed out after " + TimeUtil.deltaMs(nowMs, TimeUtil.nowMs()) + " ms.");
                }
            }
        }
    }

    private void readAndValidateResponseFrame(SelectionKey selectionKey, ByteBuffer byteBuffer, long j, int i) throws IOException {
        byteBuffer.clear();
        byteBuffer.limit(20);
        doRecv(selectionKey, byteBuffer);
        byteBuffer.flip();
        byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
        long j2 = byteBuffer.getLong();
        if (j2 != j) {
            throw new IOException("Expected sequence number " + j + ", but got sequence number " + j2);
        }
        int i2 = byteBuffer.getInt();
        if (i != i2) {
            throw new IOException("Expected method id " + i + ", but got " + i2);
        }
        int i3 = byteBuffer.getInt();
        byteBuffer.getInt();
        if (i3 < 0 || i3 > 4194304) {
            throw new IOException("Got server error with invalid length " + i3);
        }
        if (i3 > 0) {
            byteBuffer.clear();
            byteBuffer.limit(i3);
            doRecv(selectionKey, byteBuffer);
            byteBuffer.flip();
            throw new IOException("Got server error " + StandardCharsets.UTF_8.decode(byteBuffer).toString());
        }
    }

    @Override // org.apache.htrace.impl.BufferManager
    public void clear() {
        this.frameBuffer.clear();
        this.prequel.getBuffer().clear();
        this.spans.getBuffer().clear();
        this.numSpans = 0;
    }

    @Override // org.apache.htrace.impl.BufferManager
    public void close() {
        clear();
        this.prequel.close();
        this.spans.close();
        try {
            this.selector.close();
        } catch (IOException e) {
            LOG.warn("Error closing selector", e);
        }
    }
}
