package net.openhft.chronicle.decentred.remote.net;

import java.io.EOFException;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.tcp.ISocketChannel;
import zzz_koloboke_compile.shaded.org.$eclipse$.jdt.internal.compiler.lookup.TagBits;

/* loaded from: input_file:net/openhft/chronicle/decentred/remote/net/AbstractTCPConnection.class */
public abstract class AbstractTCPConnection implements TCPConnection {
    protected volatile SocketChannel channel;
    protected ISocketChannel iSocketChannel;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ThreadLocal<ByteBuffer[]> headerBytesTL = ThreadLocal.withInitial(AbstractTCPConnection::createHeaderArray);
    volatile boolean running = true;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTCPConnection(SocketChannel socketChannel) {
        channel(socketChannel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTCPConnection() {
    }

    private static ByteBuffer[] createHeaderArray() {
        return new ByteBuffer[]{ByteBuffer.allocateDirect(4).order(ByteOrder.LITTLE_ENDIAN), null};
    }

    public AbstractTCPConnection channel(SocketChannel socketChannel) {
        this.iSocketChannel = socketChannel == null ? null : ISocketChannel.wrap(socketChannel);
        this.channel = socketChannel;
        return this;
    }

    public String toString() {
        return getClass().getSimpleName() + " " + this.channel;
    }

    @Override // net.openhft.chronicle.decentred.remote.net.TCPConnection
    public void write(BytesStore<?, ByteBuffer> bytesStore) throws IOException {
        if (!this.running) {
            throw new IOException("closed");
        }
        waitForReconnect();
        if (bytesStore.readRemaining() > 1048572) {
            throw new IOException("Message too long " + bytesStore.readRemaining());
        }
        ByteBuffer underlyingObject = bytesStore.underlyingObject();
        if (!$assertionsDisabled && underlyingObject == null) {
            throw new AssertionError();
        }
        underlyingObject.limit(Math.toIntExact(bytesStore.readLimit()));
        underlyingObject.position(Math.toIntExact(bytesStore.readPosition()));
        ByteBuffer[] byteBufferArr = this.headerBytesTL.get();
        byteBufferArr[0].clear();
        byteBufferArr[0].putInt(0, 4 + underlyingObject.remaining());
        byteBufferArr[1] = underlyingObject;
        while (underlyingObject.remaining() > 0 && this.running) {
            if (this.iSocketChannel.write(byteBufferArr) < 0) {
                this.channel.close();
                throw new EOFException("Failed to write");
            }
        }
    }

    @Override // net.openhft.chronicle.decentred.remote.net.TCPConnection
    public void write(ByteBuffer byteBuffer) throws IOException {
        if (!this.running) {
            throw new IOException("closed");
        }
        waitForReconnect();
        if (byteBuffer.remaining() > 1048576) {
            throw new IOException("Message too long " + byteBuffer.remaining());
        }
        while (byteBuffer.remaining() > 0 && this.running) {
            if (this.iSocketChannel.write(byteBuffer) < 0) {
                this.channel.close();
                throw new EOFException("Failed to write");
            }
        }
    }

    protected abstract void waitForReconnect() throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public void readChannel(Bytes<ByteBuffer> bytes) throws IOException {
        if (bytes.readRemaining() >= 4) {
            int readInt = bytes.readInt(bytes.readPosition());
            if (readInt < 4 || readInt > 1048576) {
                throw new StreamCorruptedException("length: " + readInt);
            }
            if (bytes.readRemaining() >= readInt) {
                processOneMessage(readInt, bytes);
                return;
            }
        }
        if (bytes.readRemaining() == 0) {
            bytes.clear();
        } else if (bytes.readPosition() > TagBits.AreMethodsComplete) {
            bytes.compact();
        }
        ByteBuffer underlyingObject = bytes.underlyingObject();
        if (!$assertionsDisabled && underlyingObject == null) {
            throw new AssertionError();
        }
        underlyingObject.position(Math.toIntExact(bytes.writePosition()));
        underlyingObject.limit(Math.toIntExact(bytes.realCapacity()));
        if (this.iSocketChannel.read(underlyingObject) < 0) {
            throw new EOFException();
        }
        bytes.readLimit(underlyingObject.position());
    }

    private void processOneMessage(int i, Bytes<ByteBuffer> bytes) throws IOException {
        long readPosition = bytes.readPosition() + i;
        long readLimit = bytes.readLimit();
        try {
            bytes.readSkip(4L);
            bytes.readLimit(readPosition);
            onMessage(bytes);
            bytes.readLimit(readLimit);
            bytes.readPosition(readPosition);
        } catch (Throwable th) {
            bytes.readLimit(readLimit);
            bytes.readPosition(readPosition);
            throw th;
        }
    }

    protected abstract void onMessage(Bytes<ByteBuffer> bytes) throws IOException;

    @Override // net.openhft.chronicle.core.io.Closeable, java.io.Closeable, java.lang.AutoCloseable
    public final void close() {
        this.running = false;
        close2();
        Closeable.closeQuietly(this.channel);
    }

    protected abstract void close2();

    static {
        $assertionsDisabled = !AbstractTCPConnection.class.desiredAssertionStatus();
    }
}
