/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network.connection;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesUtil;
import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.CloseablesManager;
import net.openhft.chronicle.engine.api.session.SessionProvider;
import net.openhft.chronicle.engine.api.tree.View;
import net.openhft.chronicle.engine.server.WireType;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.RemoteCallTimeoutException;
import net.openhft.chronicle.network.connection.SocketChannelProvider;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.YamlLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpChannelHub
implements View,
Closeable,
SocketChannelProvider {
    public static final int SIZE_OF_SIZE = 4;
    private static final Logger LOG = LoggerFactory.getLogger(TcpChannelHub.class);
    public final long timeoutMs;
    @NotNull
    protected final String name;
    @NotNull
    protected final InetSocketAddress remoteAddress;
    protected final int tcpBufferSize;
    final Wire outWire;
    final Wire inWire;
    private final ReentrantLock outBytesLock = new ReentrantLock();
    @NotNull
    private final AtomicLong transactionID = new AtomicLong(0L);
    private final SessionProvider sessionProvider;
    private final TcpSocketConsumer tcpSocketConsumer;
    @Nullable
    protected CloseablesManager closeables;
    private long largestChunkSoFar = 0L;
    @Nullable
    private SocketChannel clientChannel;
    private long limitOfLast = 0L;
    private long startTime;
    private volatile boolean closed;

    public TcpChannelHub(SessionProvider sessionProvider, String hostname, int port) {
        this.tcpBufferSize = 65536;
        this.remoteAddress = new InetSocketAddress(hostname, port);
        this.outWire = WireType.wire.apply(Bytes.elasticByteBuffer());
        this.inWire = WireType.wire.apply(Bytes.elasticByteBuffer());
        this.name = " connected to " + this.remoteAddress.toString();
        this.timeoutMs = 10000L;
        this.attemptConnect(this.remoteAddress);
        this.tcpSocketConsumer = new TcpSocketConsumer(WireType.wire, this, this.remoteAddress.toString());
        this.sessionProvider = sessionProvider;
    }

    @Nullable
    static SocketChannel openSocketChannel(@NotNull CloseablesManager closeables) throws IOException {
        SocketChannel result = null;
        try {
            result = SocketChannel.open();
            result.socket().setTcpNoDelay(true);
        }
        finally {
            if (result != null) {
                try {
                    closeables.add((Closeable)result);
                }
                catch (IllegalStateException illegalStateException) {}
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void logToStandardOutMessageReceived(@NotNull Wire wire) {
        Bytes bytes = wire.bytes();
        if (!YamlLogging.clientReads || !Jvm.isDebug()) {
            return;
        }
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                System.out.println("\nreceives:\n\n```yaml\n" + (wire instanceof TextWire ? Wires.fromSizePrefixedBlobs((Bytes)bytes) : BytesUtil.toHexString((Bytes)bytes, (long)bytes.writePosition(), (long)bytes.writeRemaining())) + "```\n\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage = "";
            }
            catch (Exception e) {
                String x = Bytes.toString((Bytes)bytes);
                System.out.println(x);
                LOG.error("", (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    public void asyncReadSocket(long tid, @NotNull Consumer<Wire> consumer) {
        this.tcpSocketConsumer.asyncReadSocket(tid, consumer);
    }

    private synchronized void attemptConnect(InetSocketAddress remoteAddress) {
        this.closeExisting();
        if (this.closeables == null) {
            this.closeables = new CloseablesManager();
        }
        try {
            SocketChannel socketChannel = TcpChannelHub.openSocketChannel(this.closeables);
            if (socketChannel != null && socketChannel.connect(remoteAddress)) {
                this.clientChannel = socketChannel;
                this.clientChannel.configureBlocking(false);
                this.clientChannel.socket().setTcpNoDelay(true);
                this.clientChannel.socket().setReceiveBufferSize(this.tcpBufferSize);
                this.clientChannel.socket().setSendBufferSize(this.tcpBufferSize);
                this.doHandShaking();
            }
        }
        catch (IOException e) {
            LOG.error("Failed to connect to " + remoteAddress, (Throwable)e);
            if (this.closeables != null) {
                this.closeables.closeQuietly();
            }
            this.clientChannel = null;
        }
    }

    @NotNull
    public ReentrantLock outBytesLock() {
        return this.outBytesLock;
    }

    private boolean checkTimeout(long timeoutTime) {
        if (timeoutTime == 0L) {
            return false;
        }
        if (timeoutTime < System.currentTimeMillis() && !Jvm.isDebug()) {
            throw new RemoteCallTimeoutException("timeout=" + timeoutTime + "ms");
        }
        return true;
    }

    @Override
    public SocketChannel lazyConnect() {
        this.lazyConnect(this.timeoutMs, this.remoteAddress);
        return this.clientChannel;
    }

    @Override
    public synchronized SocketChannel reConnect() {
        this.close();
        this.lazyConnect(this.timeoutMs, this.remoteAddress);
        return this.clientChannel;
    }

    public synchronized SocketChannel lazyConnect(long timeoutMs, InetSocketAddress remoteAddress) {
        if (this.clientChannel != null) {
            return this.clientChannel;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("attempting to connect to " + remoteAddress + " ,name=" + this.name);
        }
        long timeoutAt = System.currentTimeMillis() + timeoutMs;
        while (this.clientChannel == null) {
            this.checkClosed();
            this.checkTimeout(timeoutAt);
            this.closeExisting();
            try {
                if (this.closeables == null) {
                    this.closeables = new CloseablesManager();
                }
                this.clientChannel = TcpChannelHub.openSocketChannel(this.closeables);
                if (this.clientChannel == null || !this.clientChannel.connect(remoteAddress)) {
                    Jvm.pause((long)100L);
                    continue;
                }
                this.clientChannel.socket().setTcpNoDelay(true);
                this.clientChannel.socket().setReceiveBufferSize(this.tcpBufferSize);
                this.clientChannel.socket().setSendBufferSize(this.tcpBufferSize);
                this.doHandShaking();
            }
            catch (IOException e) {
                if (this.closeables != null) {
                    this.closeables.closeQuietly();
                }
                this.clientChannel = null;
            }
            catch (Exception e) {
                if (this.closeables != null) {
                    this.closeables.closeQuietly();
                }
                throw e;
            }
        }
        return this.clientChannel;
    }

    private void doHandShaking() {
        this.outBytesLock().lock();
        try {
            SessionDetails sessionDetails = this.sessionDetails();
            this.outWire().writeDocument(false, wireOut -> {
                if (sessionDetails == null) {
                    wireOut.write(() -> "userid").text((CharSequence)System.getProperty("user.name"));
                } else {
                    wireOut.write(() -> "userid").text((CharSequence)sessionDetails.userId());
                }
            });
            this.writeSocket(this.outWire());
        }
        finally {
            this.outBytesLock().unlock();
        }
        if (this.tcpSocketConsumer != null) {
            this.tcpSocketConsumer.onReconnect();
        }
    }

    private SessionDetails sessionDetails() {
        if (this.sessionProvider == null) {
            return null;
        }
        return this.sessionProvider.get();
    }

    protected void closeExisting() {
        if (this.closeables != null) {
            this.closeables.closeQuietly();
        }
        this.closeables = null;
        if (this.tcpSocketConsumer != null) {
            this.tcpSocketConsumer.onConnectionClosed();
        }
    }

    @Override
    public synchronized void close() {
        this.closed = true;
        this.tcpSocketConsumer.close();
        if (this.closeables != null) {
            this.closeables.closeQuietly();
        }
        this.closeables = null;
        this.clientChannel = null;
    }

    public long nextUniqueTransaction(long time) {
        long old;
        long id = time;
        do {
            if ((old = this.transactionID.get()) < id) continue;
            id = old + 1L;
        } while (!this.transactionID.compareAndSet(old, id));
        return id;
    }

    public void writeSocket(@NotNull Wire wire) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkClosed();
        long timeoutTime = this.startTime + this.timeoutMs;
        try {
            while (true) {
                if (this.clientChannel == null) {
                    this.lazyConnect(this.timeoutMs, this.remoteAddress);
                }
                try {
                    this.writeSocket(wire, timeoutTime);
                }
                catch (ClosedChannelException e) {
                    this.checkTimeout(timeoutTime);
                    this.lazyConnect(this.timeoutMs, this.remoteAddress);
                    continue;
                }
                break;
            }
        }
        catch (IOException e) {
            this.close();
            throw new IORuntimeException((Exception)e);
        }
        catch (Exception e) {
            this.close();
            throw e;
        }
    }

    public Wire proxyReply(long timeoutTime, long tid) {
        this.checkClosed();
        try {
            return this.tcpSocketConsumer.syncBlockingReadSocket(timeoutTime, tid);
        }
        catch (RuntimeException e) {
            this.close();
            throw e;
        }
        catch (Exception e) {
            this.close();
            throw Jvm.rethrow((Throwable)e);
        }
        catch (AssertionError e) {
            LOG.error("name=" + this.name, (Throwable)((Object)e));
            throw e;
        }
    }

    private void writeSocket(@NotNull Wire outWire, long timeoutTime) throws IOException {
        assert (this.outBytesLock().isHeldByCurrentThread());
        Bytes bytes = outWire.bytes();
        long outBytesPosition = bytes.writePosition();
        if (this.outBytesLock().hasQueuedThreads() && outBytesPosition + this.largestChunkSoFar <= (long)this.tcpBufferSize) {
            return;
        }
        ByteBuffer outBuffer = (ByteBuffer)bytes.underlyingObject();
        outBuffer.limit((int)bytes.writePosition());
        outBuffer.position(0);
        if (Jvm.IS_DEBUG) {
            this.logToStandardOutMessageSent(outWire, outBuffer);
        }
        this.upateLargestChunkSoFarSize(outBuffer);
        while (outBuffer.remaining() > 0) {
            this.checkClosed();
            int len = this.clientChannel.write(outBuffer);
            if (len == -1) {
                throw new IORuntimeException("Disconnection to server");
            }
            if (outBuffer.remaining() == 0) break;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Buffer is full");
            }
            if (outBuffer.remaining() > 0 && this.outBytesLock().hasQueuedThreads() && (long)outBuffer.remaining() + this.largestChunkSoFar <= (long)this.tcpBufferSize) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("continuing -  without all the data being written to the buffer as it will be written by the next thread");
                }
                outBuffer.compact();
                bytes.writeLimit((long)outBuffer.limit());
                bytes.writePosition((long)outBuffer.position());
                return;
            }
            this.checkTimeout(timeoutTime);
        }
        outBuffer.clear();
        bytes.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void logToStandardOutMessageSent(@NotNull Wire wire, @NotNull ByteBuffer outBuffer) {
        if (!YamlLogging.clientWrites || !Jvm.isDebug()) {
            return;
        }
        Bytes bytes = wire.bytes();
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            bytes.writeLimit((long)outBuffer.limit());
            bytes.writePosition((long)outBuffer.position());
            if (YamlLogging.clientWrites) {
                try {
                    System.out.println((!YamlLogging.title.isEmpty() ? "### " + YamlLogging.title + "\n" : "") + "" + YamlLogging.writeMessage + (YamlLogging.writeMessage.isEmpty() ? "" : "\n\n") + "sends:\n\n" + "```yaml\n" + (wire instanceof TextWire ? Wires.fromSizePrefixedBlobs((Bytes)bytes, (long)bytes.writePosition(), (long)bytes.writeLimit()) : BytesUtil.toHexString((Bytes)bytes, (long)bytes.writePosition(), (long)bytes.writeRemaining())) + "```");
                    YamlLogging.title = "";
                    YamlLogging.writeMessage = "";
                }
                catch (Exception e) {
                    LOG.error(Bytes.toString((Bytes)bytes), (Throwable)e);
                }
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    private void upateLargestChunkSoFarSize(@NotNull ByteBuffer outBuffer) {
        int sizeOfThisChunk = (int)((long)outBuffer.limit() - this.limitOfLast);
        if (this.largestChunkSoFar < (long)sizeOfThisChunk) {
            this.largestChunkSoFar = sizeOfThisChunk;
        }
        this.limitOfLast = outBuffer.limit();
    }

    public Wire outWire() {
        assert (this.outBytesLock().isHeldByCurrentThread());
        return this.outWire;
    }

    public long writeMetaDataStartTime(long startTime, @NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkClosed();
        this.startTime(startTime);
        long tid = this.nextUniqueTransaction(startTime);
        this.writeMetaDataForKnownTID(tid, wire, csp, cid);
        return tid;
    }

    public void writeMetaDataForKnownTID(long tid, @NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkClosed();
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
            wireOut.writeEventName((WireKey)CoreFields.tid).int64(tid);
        });
    }

    public void writeAsyncHeader(@NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkClosed();
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
        });
    }

    public void startTime(long startTime) {
        this.startTime = startTime;
    }

    void checkClosed() {
        if (this.closed) {
            throw new IllegalStateException("Closed");
        }
    }

    public void lock(Task r) {
        this.outBytesLock().lock();
        try {
            r.run();
        }
        catch (Exception e) {
            throw Jvm.rethrow((Throwable)e);
        }
        finally {
            this.outBytesLock().unlock();
        }
    }

    private static class TcpSocketConsumer
    implements Closeable {
        private final ExecutorService executorService;
        private final SocketChannelProvider provider;
        private final Map<Long, Object> map = new ConcurrentHashMap<Long, Object>();
        private volatile boolean closeSocketConsumer;
        private Function<Bytes, Wire> wireFunction;
        @Nullable
        private SocketChannel clientChannel;
        private long tid;
        private ThreadLocal<Wire> syncInWireThreadLocal = ThreadLocal.withInitial(() -> WireType.wire.apply(Bytes.elasticByteBuffer()));

        public void onReconnect() {
            this.map.values().forEach(v -> {
                if (v instanceof AsyncSubscription) {
                    ((AsyncSubscription)v).applySubscribe();
                }
            });
        }

        public void onConnectionClosed() {
            this.map.values().forEach(v -> {
                if (v instanceof AsyncSubscription) {
                    ((AsyncSubscription)v).onClose();
                }
            });
        }

        private TcpSocketConsumer(@NotNull Function<Bytes, Wire> wireFunction, @NotNull SocketChannelProvider provider, @NotNull String name) {
            this.wireFunction = wireFunction;
            this.provider = provider;
            this.clientChannel = provider.lazyConnect();
            this.executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("TcpSocketConsumer-" + name, Boolean.valueOf(true)));
            this.start();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Wire syncBlockingReadSocket(long timeoutTimeMs, long tid) throws InterruptedException, TimeoutException {
            long start = System.currentTimeMillis();
            Wire wire = this.syncInWireThreadLocal.get();
            wire.clear();
            Bytes bytes = wire.bytes();
            ((ByteBuffer)bytes.underlyingObject()).clear();
            Bytes bytes2 = bytes;
            synchronized (bytes2) {
                this.map.put(tid, bytes);
                bytes.wait(timeoutTimeMs);
            }
            TcpChannelHub.logToStandardOutMessageReceived(wire);
            if (System.currentTimeMillis() - start >= timeoutTimeMs) {
                throw new TimeoutException("timeoutTimeMs=" + timeoutTimeMs);
            }
            return wire;
        }

        @Deprecated
        private void asyncReadSocket(long tid, @NotNull Consumer<Wire> consumer) {
            this.map.put(tid, consumer);
        }

        public void subscribe(@NotNull AsyncSubscription asyncSubscription) {
            this.map.put(asyncSubscription.tid(), asyncSubscription);
            asyncSubscription.applySubscribe();
        }

        public void unsubscribe(long tid) {
            this.map.remove(tid);
        }

        private void start() {
            this.executorService.submit(() -> {
                block2: {
                    try {
                        this.running();
                    }
                    catch (Throwable e) {
                        if (this.isClosed()) break block2;
                        LOG.error("", e);
                    }
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void running() {
            Wire inWire = this.wireFunction.apply(Bytes.elasticByteBuffer());
            assert (inWire != null);
            while (!this.isClosed()) {
                try {
                    Bytes bytes = inWire.bytes();
                    this.blockingRead(inWire, 4L);
                    int header = bytes.readVolatileInt(0L);
                    long messageSize = this.size(header);
                    if (Wires.isData((long)header)) {
                        assert (messageSize < Integer.MAX_VALUE);
                        this.processData(this.tid, Wires.isReady((long)header), header, (int)messageSize, inWire);
                        continue;
                    }
                    this.blockingRead(inWire, messageSize);
                    TcpChannelHub.logToStandardOutMessageReceived(inWire);
                    inWire.readDocument(w -> {
                        this.tid = CoreFields.tid(w);
                    }, null);
                }
                catch (IOException e) {
                    if (this.isClosed()) continue;
                    this.clientChannel = this.provider.reConnect();
                }
                finally {
                    this.clear(inWire);
                }
            }
        }

        private boolean isClosed() {
            return this.closeSocketConsumer || Thread.currentThread().isInterrupted();
        }

        private void clear(Wire inWire) {
            inWire.clear();
            ((ByteBuffer)inWire.bytes().underlyingObject()).clear();
        }

        private long size(int header) {
            long messageSize = Wires.lengthOf((long)header);
            assert (messageSize > 0L) : "Invalid message size " + messageSize;
            assert (messageSize < 0x40000000L) : "Invalid message size " + messageSize;
            return messageSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processData(long tid, boolean isReady, int header, int messageSize, Wire inWire) throws IOException {
            Object o;
            block8: {
                long startTime = 0L;
                do {
                    Object object = o = isReady ? this.map.remove(tid) : this.map.get(tid);
                    if (o != null) break block8;
                    if (startTime != 0L) continue;
                    startTime = System.currentTimeMillis();
                } while (System.currentTimeMillis() - startTime <= 1000L);
                LOG.error("unable to respond to tid=" + tid);
                this.blockingRead(inWire, messageSize);
                return;
            }
            if (o instanceof AsyncSubscription) {
                this.blockingRead(inWire, messageSize);
                TcpChannelHub.logToStandardOutMessageReceived(inWire);
                ((AsyncSubscription)o).onConsumer(inWire);
            } else if (o instanceof Consumer) {
                Consumer consumer = (Consumer)o;
                this.blockingRead(inWire, messageSize);
                TcpChannelHub.logToStandardOutMessageReceived(inWire);
                consumer.accept(inWire);
            } else {
                Bytes bytes;
                Bytes bytes2 = bytes = (Bytes)o;
                synchronized (bytes2) {
                    bytes.clear();
                    ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
                    byteBuffer.clear();
                    bytes.writeInt(0L, header);
                    byteBuffer.position(4);
                    byteBuffer.limit(4 + messageSize);
                    this.readBuffer(byteBuffer);
                    bytes.readLimit((long)byteBuffer.position());
                    bytes.notifyAll();
                }
            }
        }

        private void blockingRead(@NotNull Wire wire, long numberOfBytes) throws IOException {
            Bytes bytes = wire.bytes();
            bytes.ensureCapacity(bytes.readPosition() + numberOfBytes);
            ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
            long start = buffer.position();
            buffer.limit((int)(start + numberOfBytes));
            this.readBuffer(buffer);
            bytes.readLimit((long)buffer.position());
        }

        private void readBuffer(ByteBuffer buffer) throws IOException {
            while (buffer.remaining() > 0) {
                if (this.clientChannel == null || this.clientChannel.read(buffer) == -1) {
                    throw new IORuntimeException("Disconnection to server");
                }
                if (!this.closeSocketConsumer) continue;
                throw new ClosedChannelException();
            }
        }

        @Override
        public void close() {
            this.closeSocketConsumer = true;
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.MILLISECONDS)) {
                    this.executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                LOG.error("", (Throwable)e);
            }
            try {
                if (this.clientChannel != null) {
                    this.clientChannel.close();
                }
            }
            catch (IOException e) {
                LOG.error("", (Throwable)e);
            }
        }
    }

    public static interface Task {
        public void run() throws Exception;
    }
}

