/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network.connection;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.text.MessageFormat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesUtil;
import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.session.SessionProvider;
import net.openhft.chronicle.engine.api.tree.View;
import net.openhft.chronicle.engine.server.WireType;
import net.openhft.chronicle.engine.server.internal.SystemHandler;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.AsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.RemoteCallTimeoutException;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.YamlLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpChannelHub
implements View,
Closeable {
    public static final int HEATBEAT_PING_PERIOD = Integer.getInteger("heartbeat.ping.period", 5000);
    public static final int HEATBEAT_TIMEOUT_PERIOD = Integer.getInteger("heartbeat.timeout", 10000);
    public static final int SIZE_OF_SIZE = 4;
    private static final Logger LOG = LoggerFactory.getLogger(TcpChannelHub.class);
    public final long timeoutMs;
    @NotNull
    protected final String name;
    @NotNull
    protected final InetSocketAddress remoteAddress;
    protected final int tcpBufferSize;
    final Wire outWire;
    final Wire inWire;
    private final ReentrantLock outBytesLock = new ReentrantLock();
    @NotNull
    private final AtomicLong transactionID = new AtomicLong(0L);
    private final SessionProvider sessionProvider;
    @NotNull
    private final TcpSocketConsumer tcpSocketConsumer;
    private final EventLoop eventLoop;
    private long largestChunkSoFar = 0L;
    @Nullable
    private volatile SocketChannel clientChannel;
    private long limitOfLast = 0L;
    private long startTime;
    private volatile boolean closed;
    private String hostname;
    private int port;
    private final Wire handShakingWire;
    private AtomicBoolean connectOnce = new AtomicBoolean(true);

    public TcpChannelHub(@NotNull SessionProvider sessionProvider, @NotNull String hostname, int port, @NotNull EventLoop eventLoop) {
        this.eventLoop = eventLoop;
        this.hostname = hostname;
        this.port = port;
        this.tcpSocketConsumer = new TcpSocketConsumer(WireType.wire, hostname);
        this.tcpBufferSize = 65536;
        this.remoteAddress = new InetSocketAddress(hostname, port);
        this.outWire = WireType.wire.apply(Bytes.elasticByteBuffer());
        this.inWire = WireType.wire.apply(Bytes.elasticByteBuffer());
        this.name = " connected to " + this.remoteAddress.toString();
        this.timeoutMs = 10000L;
        Bytes byteBufferBytes = Bytes.elasticByteBuffer();
        this.handShakingWire = WireType.wire.apply(byteBufferBytes);
        this.attemptConnect(this.remoteAddress);
        this.sessionProvider = sessionProvider;
        this.tcpSocketConsumer.start();
    }

    @Nullable
    static SocketChannel openSocketChannel() throws IOException {
        SocketChannel result = SocketChannel.open();
        result.socket().setTcpNoDelay(true);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void logToStandardOutMessageReceived(@NotNull Wire wire) {
        Bytes bytes = wire.bytes();
        if (!YamlLogging.clientReads) {
            return;
        }
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                System.out.println("\nreceives:\n" + (wire instanceof TextWire ? "```yaml\n" + Wires.fromSizePrefixedBlobs((Bytes)bytes) : "```\n" + BytesUtil.toHexString((Bytes)bytes, (long)bytes.readPosition(), (long)bytes.readRemaining())) + "```\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage = "";
            }
            catch (Exception e) {
                String x = Bytes.toString((Bytes)bytes);
                System.out.println(x);
                LOG.error("", (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    public void subscribe(@NotNull AsyncSubscription asyncSubscription) {
        this.tcpSocketConsumer.subscribe(asyncSubscription);
    }

    public void unsubscribe(long tid) {
        this.tcpSocketConsumer.unsubscribe(tid);
    }

    private synchronized void attemptConnect(InetSocketAddress remoteAddress) {
        this.closeSocket();
        try {
            SocketChannel socketChannel = TcpChannelHub.openSocketChannel();
            if (socketChannel != null && socketChannel.connect(remoteAddress)) {
                this.clientChannel = socketChannel;
                this.clientChannel.configureBlocking(false);
                this.clientChannel.socket().setTcpNoDelay(true);
                this.clientChannel.socket().setReceiveBufferSize(this.tcpBufferSize);
                this.clientChannel.socket().setSendBufferSize(this.tcpBufferSize);
                this.doHandShaking(socketChannel);
            }
        }
        catch (IOException e) {
            this.clientChannel = null;
        }
    }

    @NotNull
    public ReentrantLock outBytesLock() {
        return this.outBytesLock;
    }

    private boolean checkTimeout(long timeoutTime) {
        if (timeoutTime == 0L) {
            return false;
        }
        if (timeoutTime < System.currentTimeMillis() && !Jvm.isDebug()) {
            throw new RemoteCallTimeoutException("timeout=" + timeoutTime + "ms");
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private SocketChannel reConnect() throws InterruptedException {
        boolean connectOnce = this.connectOnce.getAndSet(false);
        TcpChannelHub tcpChannelHub = this;
        synchronized (tcpChannelHub) {
            if (!connectOnce) {
                return this.clientChannel;
            }
            try {
                this.closeSocket();
                Jvm.pause((long)500L);
                this.connect(this.timeoutMs, this.remoteAddress);
            }
            finally {
                this.connectOnce.set(true);
            }
        }
        this.tcpSocketConsumer.onReconnect();
        return this.clientChannel;
    }

    @Nullable
    public synchronized SocketChannel connect(long timeoutMs, InetSocketAddress remoteAddress) {
        if (this.clientChannel != null) {
            return this.clientChannel;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("attempting to connect to " + remoteAddress + " ,name=" + this.name);
        }
        long timeoutAt = System.currentTimeMillis() + timeoutMs;
        while (this.clientChannel == null) {
            this.checkTimeout(timeoutAt);
            try {
                SocketChannel socketChannel = TcpChannelHub.openSocketChannel();
                if (socketChannel == null || !socketChannel.connect(remoteAddress)) {
                    Jvm.pause((long)100L);
                    continue;
                }
                socketChannel.socket().setTcpNoDelay(true);
                socketChannel.socket().setReceiveBufferSize(this.tcpBufferSize);
                socketChannel.socket().setSendBufferSize(this.tcpBufferSize);
                this.doHandShaking(socketChannel);
                this.clientChannel = socketChannel;
                this.tcpSocketConsumer.onMessageReceived();
                this.closed = false;
            }
            catch (IOException e) {
                this.clientChannel = null;
            }
            catch (Exception e) {
                throw e;
            }
        }
        return this.clientChannel;
    }

    private void flushBuffer() throws IOException {
        Bytes buff = Bytes.elasticByteBuffer();
        this.clientChannel.read((ByteBuffer)buff.underlyingObject());
    }

    private synchronized void doHandShaking(SocketChannel socketChannel) throws IOException {
        SessionDetails sessionDetails = this.sessionDetails();
        this.handShakingWire.clear();
        this.handShakingWire.bytes().clear();
        this.handShakingWire.writeDocument(false, wireOut -> {
            if (sessionDetails == null) {
                wireOut.writeEventName((WireKey)SystemHandler.EventId.userid).text((CharSequence)System.getProperty("user.name"));
            } else {
                wireOut.writeEventName((WireKey)SystemHandler.EventId.userid).text((CharSequence)sessionDetails.userId());
            }
        });
        this.writeSocket((WireOut)this.handShakingWire, this.timeoutMs, socketChannel);
    }

    private SessionDetails sessionDetails() {
        if (this.sessionProvider == null) {
            return null;
        }
        return this.sessionProvider.get();
    }

    protected synchronized void closeSocket() {
        if (this.clientChannel != null) {
            try {
                this.clientChannel.socket().shutdownInput();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                this.clientChannel.socket().shutdownOutput();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                this.clientChannel.socket().close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                this.clientChannel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            this.clientChannel = null;
            if (this.tcpSocketConsumer != null) {
                this.tcpSocketConsumer.onConnectionClosed();
            }
        }
    }

    @Override
    public synchronized void close() {
        this.closed = true;
        this.tcpSocketConsumer.stop();
        this.closeSocket();
    }

    public long nextUniqueTransaction(long time) {
        long old;
        long id = time;
        do {
            if ((old = this.transactionID.get()) != id) continue;
            id = old + 1L;
        } while (!this.transactionID.compareAndSet(old, id));
        return id;
    }

    public void writeSocket(@NotNull WireOut wire) {
        this.checkNotClosed();
        long timeoutTime = this.startTime + this.timeoutMs;
        try {
            while (true) {
                try {
                    this.writeSocket(wire, timeoutTime, this.socketChannel());
                }
                catch (ClosedChannelException e) {
                    this.checkTimeout(timeoutTime);
                    this.reConnect();
                    continue;
                }
                break;
            }
        }
        catch (IOException e) {
            this.closeSocket();
            throw new IORuntimeException("server=" + this.hostname + ":" + this.port, e);
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
        }
    }

    @NotNull
    private SocketChannel socketChannel() throws InterruptedException {
        SocketChannel clientChannel;
        if (this.clientChannel != null) {
            return this.clientChannel;
        }
        while ((clientChannel = this.clientChannel) == null) {
            this.reConnect();
        }
        return clientChannel;
    }

    public Wire proxyReply(long timeoutTime, long tid) {
        this.checkNotClosed();
        try {
            return this.tcpSocketConsumer.syncBlockingReadSocket(timeoutTime, tid);
        }
        catch (IORuntimeException e) {
            throw e;
        }
        catch (RuntimeException e) {
            this.closeSocket();
            throw e;
        }
        catch (Exception e) {
            this.closeSocket();
            throw Jvm.rethrow((Throwable)e);
        }
        catch (AssertionError e) {
            throw e;
        }
    }

    private void writeSocket(@NotNull WireOut outWire, long timeoutTime, @NotNull SocketChannel socketChannel) throws IOException {
        Bytes bytes = outWire.bytes();
        long outBytesPosition = bytes.writePosition();
        if (this.outBytesLock().hasQueuedThreads() && outBytesPosition + this.largestChunkSoFar <= (long)this.tcpBufferSize) {
            return;
        }
        ByteBuffer outBuffer = (ByteBuffer)bytes.underlyingObject();
        outBuffer.limit((int)bytes.writePosition());
        outBuffer.position(0);
        if (Jvm.IS_DEBUG) {
            this.logToStandardOutMessageSent(outWire, outBuffer);
        }
        this.upateLargestChunkSoFarSize(outBuffer);
        while (outBuffer.remaining() > 0) {
            this.checkNotClosed();
            int len = socketChannel.write(outBuffer);
            if (len == -1) {
                throw new IORuntimeException("Disconnection to server " + this.hostname + ":" + this.port);
            }
            if (outBuffer.remaining() == 0) break;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Buffer is full");
            }
            if (outBuffer.remaining() > 0 && this.outBytesLock().hasQueuedThreads() && (long)outBuffer.remaining() + this.largestChunkSoFar <= (long)this.tcpBufferSize) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("continuing -  without all the data being written to the buffer as it will be written by the next thread");
                }
                outBuffer.compact();
                bytes.writeLimit((long)outBuffer.limit());
                bytes.writePosition((long)outBuffer.position());
                return;
            }
            this.checkTimeout(timeoutTime);
        }
        outBuffer.clear();
        bytes.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void logToStandardOutMessageSent(@NotNull WireOut wire, @NotNull ByteBuffer outBuffer) {
        if (!YamlLogging.clientWrites) {
            return;
        }
        Bytes bytes = wire.bytes();
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            bytes.writeLimit((long)outBuffer.limit());
            bytes.writePosition((long)outBuffer.position());
            try {
                System.out.println((!YamlLogging.title.isEmpty() ? "### " + YamlLogging.title + "\n" : "") + "" + YamlLogging.writeMessage + (YamlLogging.writeMessage.isEmpty() ? "" : "\n\n") + "sends:\n\n" + "```yaml\n" + (wire instanceof TextWire ? Wires.fromSizePrefixedBlobs((Bytes)bytes, (long)bytes.writePosition(), (long)bytes.writeLimit()) : BytesUtil.toHexString((Bytes)bytes, (long)bytes.writePosition(), (long)bytes.writeRemaining())) + "```");
                YamlLogging.title = "";
                YamlLogging.writeMessage = "";
            }
            catch (Exception e) {
                LOG.error(Bytes.toString((Bytes)bytes), (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    private void upateLargestChunkSoFarSize(@NotNull ByteBuffer outBuffer) {
        int sizeOfThisChunk = (int)((long)outBuffer.limit() - this.limitOfLast);
        if (this.largestChunkSoFar < (long)sizeOfThisChunk) {
            this.largestChunkSoFar = sizeOfThisChunk;
        }
        this.limitOfLast = outBuffer.limit();
    }

    public Wire outWire() {
        assert (this.outBytesLock().isHeldByCurrentThread());
        return this.outWire;
    }

    public long writeMetaDataStartTime(long startTime, @NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkNotClosed();
        this.startTime(startTime);
        long tid = this.nextUniqueTransaction(startTime);
        this.writeMetaDataForKnownTID(tid, wire, csp, cid);
        return tid;
    }

    public void writeMetaDataForKnownTID(long tid, @NotNull Wire wire, @Nullable String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkNotClosed();
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
            wireOut.writeEventName((WireKey)CoreFields.tid).int64(tid);
        });
    }

    public void writeAsyncHeader(@NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        this.checkNotClosed();
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
        });
    }

    public void startTime(long startTime) {
        this.startTime = startTime;
    }

    void checkNotClosed() {
        if (this.closed) {
            throw new IllegalStateException("Closed");
        }
    }

    public void lock(@NotNull Task r) {
        this.outBytesLock().lock();
        try {
            r.run();
        }
        catch (Exception e) {
            throw Jvm.rethrow((Throwable)e);
        }
        finally {
            this.outBytesLock().unlock();
        }
    }

    private void reflectServerHeartbeatMessage(ValueIn valueIn) {
        long timestamp = valueIn.int64();
        this.lock(() -> {
            this.writeMetaDataForKnownTID(0L, this.outWire, null, 0L);
            this.outWire.writeDocument(false, w -> w.writeEventName((WireKey)SystemHandler.EventId.heartbeatReply).int64(timestamp));
            this.writeSocket((WireOut)this.outWire);
        });
    }

    private class TcpSocketConsumer
    implements EventHandler {
        private volatile ExecutorService executorService;
        @NotNull
        private final Map<Long, Object> map = new ConcurrentHashMap<Long, Object>();
        private volatile boolean isShutdown;
        private Function<Bytes, Wire> wireFunction;
        @Nullable
        private long tid;
        @NotNull
        private ThreadLocal<Wire> syncInWireThreadLocal = ThreadLocal.withInitial(() -> WireType.wire.apply(Bytes.elasticByteBuffer()));
        private Bytes serverHeartBeatHandler = Bytes.elasticByteBuffer();
        private volatile long lastTimeMessageReceived = System.currentTimeMillis();
        private final AtomicBoolean awaitingHeartbeat = new AtomicBoolean();

        private void onReconnect() {
            this.map.values().forEach(v -> {
                if (v instanceof AsyncSubscription && !(v instanceof AsyncTemporarySubscription)) {
                    ((AsyncSubscription)v).applySubscribe();
                }
            });
        }

        public void onConnectionClosed() {
            this.map.values().forEach(v -> {
                if (v instanceof AsyncSubscription) {
                    ((AsyncSubscription)v).onClose();
                } else if (v instanceof Bytes) {
                    Object object = v;
                    synchronized (object) {
                        v.notifyAll();
                    }
                }
            });
        }

        private TcpSocketConsumer(@NotNull Function<Bytes, Wire> wireFunction, String name) {
            this.wireFunction = wireFunction;
            TcpChannelHub.this.eventLoop.addHandler((EventHandler)this);
        }

        public HandlerPriority priority() {
            return HandlerPriority.MONITOR;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Wire syncBlockingReadSocket(long timeoutTimeMs, long tid) throws InterruptedException, TimeoutException {
            long start = System.currentTimeMillis();
            Wire wire = this.syncInWireThreadLocal.get();
            wire.clear();
            Bytes bytes = wire.bytes();
            ((ByteBuffer)bytes.underlyingObject()).clear();
            Bytes bytes2 = bytes;
            synchronized (bytes2) {
                this.map.put(tid, bytes);
                bytes.wait(timeoutTimeMs);
                if (TcpChannelHub.this.clientChannel == null) {
                    IORuntimeException e = new IORuntimeException("Unable to reprocess response, as the server=" + TcpChannelHub.this.hostname + ":" + TcpChannelHub.this.port + " is " + "disconnected");
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(TcpChannelHub.this.hostname + ":" + TcpChannelHub.this.port, (Throwable)e);
                    }
                    throw Jvm.rethrow((Throwable)e);
                }
            }
            TcpChannelHub.logToStandardOutMessageReceived(wire);
            if (System.currentTimeMillis() - start >= timeoutTimeMs) {
                throw new TimeoutException("timeoutTimeMs=" + timeoutTimeMs);
            }
            return wire;
        }

        private void subscribe(@NotNull AsyncSubscription asyncSubscription) {
            this.map.put(asyncSubscription.tid(), asyncSubscription);
            asyncSubscription.applySubscribe();
        }

        public void unsubscribe(long tid) {
            this.map.remove(tid);
        }

        private void start() {
            this.checkNotShutdown();
            this.awaitingHeartbeat.set(false);
            this.executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("TcpSocketConsumer-" + TcpChannelHub.this.name, Boolean.valueOf(true)));
            this.isShutdown = false;
            this.executorService.submit(() -> {
                block3: {
                    try {
                        this.running();
                    }
                    catch (IORuntimeException e) {
                        LOG.debug("", (Throwable)e);
                    }
                    catch (Throwable e) {
                        if (this.isShutdown()) break block3;
                        LOG.error("", e);
                    }
                }
            });
        }

        private void checkNotShutdown() {
            if (this.isShutdown) {
                throw new IllegalStateException("you can not call this method once stop() has been caleld.");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void running() {
            Wire inWire = this.wireFunction.apply(Bytes.elasticByteBuffer());
            assert (inWire != null);
            while (!this.isShutdown()) {
                try {
                    Bytes bytes = inWire.bytes();
                    this.blockingRead((WireIn)inWire, 4L);
                    int header = bytes.readVolatileInt(0L);
                    long messageSize = this.size(header);
                    if (Wires.isData((long)header)) {
                        assert (messageSize < Integer.MAX_VALUE);
                        this.processData(this.tid, Wires.isReady((long)header), header, (int)messageSize, inWire);
                        continue;
                    }
                    this.blockingRead((WireIn)inWire, messageSize);
                    TcpChannelHub.logToStandardOutMessageReceived(inWire);
                    inWire.readDocument(w -> {
                        this.tid = CoreFields.tid(w);
                    }, null);
                }
                catch (IOException e) {
                    if (this.isShutdown()) continue;
                    try {
                        TcpChannelHub.this.clientChannel = TcpChannelHub.this.reConnect();
                    }
                    catch (InterruptedException e1) {
                        return;
                    }
                }
                finally {
                    this.clear(inWire);
                }
            }
        }

        private boolean isShutdown() {
            return this.isShutdown || Thread.currentThread().isInterrupted();
        }

        private void clear(@NotNull Wire inWire) {
            inWire.clear();
            ((ByteBuffer)inWire.bytes().underlyingObject()).clear();
        }

        private long size(int header) {
            long messageSize = Wires.lengthOf((long)header);
            assert (messageSize > 0L) : "Invalid message size " + messageSize;
            assert (messageSize < 0x40000000L) : "Invalid message size " + messageSize;
            return messageSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processData(long tid, boolean isReady, int header, int messageSize, @NotNull Wire inWire) throws IOException {
            long startTime = 0L;
            Object o = null;
            while (!this.isShutdown()) {
                Object object = o = isReady ? this.map.remove(tid) : this.map.get(tid);
                if (o != null) break;
                if (startTime == 0L) {
                    startTime = System.currentTimeMillis();
                }
                if (System.currentTimeMillis() - startTime <= 3000L) continue;
                LOG.error("unable to respond to tid=" + tid + ", given that we have received a " + " message we a tid which is unknown, something has become corrupted, " + "so the safest thing to do is to drop the connection to the server and " + "start again.");
                return;
            }
            if (tid == 0L) {
                this.processServerSystemMessage(header, messageSize);
                return;
            }
            if (o instanceof AsyncSubscription) {
                this.blockingRead((WireIn)inWire, messageSize);
                TcpChannelHub.logToStandardOutMessageReceived(inWire);
                this.onMessageReceived();
                ((AsyncSubscription)o).onConsumer((WireIn)inWire);
            } else {
                Bytes bytes;
                Bytes bytes2 = bytes = (Bytes)o;
                synchronized (bytes2) {
                    bytes.clear();
                    ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
                    byteBuffer.clear();
                    bytes.writeInt(0L, header);
                    byteBuffer.position(4);
                    byteBuffer.limit(4 + messageSize);
                    this.readBuffer(byteBuffer);
                    this.onMessageReceived();
                    bytes.readLimit((long)byteBuffer.position());
                    bytes.notifyAll();
                }
            }
        }

        private void processServerSystemMessage(int header, int messageSize) throws IOException {
            this.serverHeartBeatHandler.clear();
            Bytes bytes = this.serverHeartBeatHandler;
            bytes.clear();
            ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
            byteBuffer.clear();
            bytes.writeInt(0L, header);
            byteBuffer.position(4);
            byteBuffer.limit(4 + messageSize);
            this.readBuffer(byteBuffer);
            bytes.readLimit((long)byteBuffer.position());
            StringBuilder eventName = Wires.acquireStringBuilder();
            WireType.wire.apply(bytes).readDocument(null, d -> {
                ValueIn valueIn = d.readEventName(eventName);
                if (SystemHandler.EventId.heartbeat.contentEquals(eventName)) {
                    TcpChannelHub.this.reflectServerHeartbeatMessage(valueIn);
                }
            });
        }

        private void blockingRead(@NotNull WireIn wire, long numberOfBytes) throws IOException {
            Bytes bytes = wire.bytes();
            bytes.ensureCapacity(bytes.readPosition() + numberOfBytes);
            ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
            int start = (int)bytes.writePosition();
            buffer.position(start);
            buffer.limit((int)((long)start + numberOfBytes));
            this.readBuffer(buffer);
            bytes.readLimit((long)buffer.position());
            this.onMessageReceived();
        }

        private void readBuffer(@NotNull ByteBuffer buffer) throws IOException {
            while (buffer.remaining() > 0) {
                try {
                    if (TcpChannelHub.this.clientChannel == null || TcpChannelHub.this.clientChannel.read(buffer) == -1) {
                        TcpChannelHub.this.reConnect();
                    }
                    if (!this.isShutdown) continue;
                    throw new IORuntimeException("The server was shutdown, " + TcpChannelHub.this.hostname + ":" + TcpChannelHub.this.port);
                }
                catch (InterruptedException e) {
                    LOG.error("", (Throwable)e);
                }
            }
        }

        private void onMessageReceived() {
            this.lastTimeMessageReceived = System.currentTimeMillis();
        }

        private void sendHeartbeat() {
            this.awaitingHeartbeat.set(true);
            final long l = System.nanoTime();
            this.subscribe(new AbstractAsyncTemporarySubscription(TcpChannelHub.this, null){

                @Override
                public void onSubscribe(WireOut wireOut) {
                    wireOut.writeEventName((WireKey)SystemHandler.EventId.heartbeat).int64(System.currentTimeMillis());
                }

                @Override
                public void onConsumer(WireIn inWire) {
                    TcpSocketConsumer.this.awaitingHeartbeat.set(false);
                    long roundTipTimeMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - l);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(MessageFormat.format("{0}:{1}heartbeat round trip time={2}us", TcpChannelHub.this.hostname, TcpChannelHub.this.port, roundTipTimeMicros));
                    }
                    inWire.clear();
                }
            });
        }

        private void stop() {
            this.isShutdown = true;
            if (this.executorService == null) {
                return;
            }
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.MILLISECONDS)) {
                    this.executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                LOG.error("", (Throwable)e);
            }
            this.executorService = null;
        }

        public boolean action() throws InvalidEventHandlerException {
            if (TcpChannelHub.this.closed) {
                throw new InvalidEventHandlerException();
            }
            long millisecondsSinceLastMessageReceived = System.currentTimeMillis() - this.lastTimeMessageReceived;
            if (millisecondsSinceLastMessageReceived >= (long)HEATBEAT_PING_PERIOD && !this.awaitingHeartbeat.get()) {
                this.sendHeartbeat();
            }
            if (TcpChannelHub.this.closed) {
                throw new InvalidEventHandlerException();
            }
            long x = millisecondsSinceLastMessageReceived - (long)HEATBEAT_TIMEOUT_PERIOD;
            if (x > 0L) {
                System.out.println("millisecondsSinceLastMessageReceived=" + millisecondsSinceLastMessageReceived);
                try {
                    TcpChannelHub.this.reConnect();
                }
                catch (InterruptedException e) {
                    return true;
                }
            }
            if (TcpChannelHub.this.closed) {
                throw new InvalidEventHandlerException();
            }
            return true;
        }
    }

    public static interface Task {
        public void run();
    }
}

