/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network.connection;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesUtil;
import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.util.Time;
import net.openhft.chronicle.engine.api.session.SessionProvider;
import net.openhft.chronicle.engine.api.tree.View;
import net.openhft.chronicle.engine.server.internal.SystemHandler;
import net.openhft.chronicle.network.TCPRegistry;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.AsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.YamlLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpChannelHub
implements View,
Closeable {
    public static final int HEATBEAT_PING_PERIOD = Integer.getInteger("heartbeat.ping.period", 5000);
    public static final int HEATBEAT_TIMEOUT_PERIOD = Integer.getInteger("heartbeat.timeout", 20000);
    public static final int SIZE_OF_SIZE = 4;
    private static final Logger LOG = LoggerFactory.getLogger(TcpChannelHub.class);
    public final long timeoutMs;
    @NotNull
    protected final String name;
    @NotNull
    protected final InetSocketAddress remoteAddress;
    protected final int tcpBufferSize;
    final Wire outWire;
    final Wire inWire;
    private final ReentrantLock outBytesLock = new ReentrantLock();
    @NotNull
    private final AtomicLong transactionID = new AtomicLong(0L);
    private final SessionProvider sessionProvider;
    @NotNull
    private final TcpSocketConsumer tcpSocketConsumer;
    private final EventLoop eventLoop;
    private final Function<Bytes, Wire> wire;
    private long largestChunkSoFar = 0L;
    @Nullable
    private volatile SocketChannel clientChannel;
    private long limitOfLast = 0L;
    private final Wire handShakingWire;
    private final String description;

    public String toString() {
        return "TcpChannelHub{name=" + this.name + "remoteAddress=" + this.remoteAddress + ", description='" + this.description + '}';
    }

    public TcpChannelHub(@NotNull SessionProvider sessionProvider, @NotNull String description, @NotNull EventLoop eventLoop, @NotNull Function<Bytes, Wire> wire, @NotNull String name) {
        this.description = description;
        this.eventLoop = eventLoop;
        this.tcpBufferSize = 65536;
        this.remoteAddress = TCPRegistry.lookup((String)description);
        this.outWire = wire.apply(Bytes.elasticByteBuffer());
        this.inWire = wire.apply(Bytes.elasticByteBuffer());
        this.name = name;
        this.timeoutMs = 10000L;
        this.wire = wire;
        this.handShakingWire = wire.apply(Bytes.elasticByteBuffer());
        this.sessionProvider = sessionProvider;
        this.tcpSocketConsumer = new TcpSocketConsumer(wire);
    }

    @Nullable
    static SocketChannel openSocketChannel() throws IOException {
        SocketChannel result = SocketChannel.open();
        result.socket().setTcpNoDelay(true);
        return result;
    }

    private void onDisconnected() {
        if (Jvm.isDebug()) {
            System.out.println(" disconnected to remoteAddress=" + this.remoteAddress);
        }
        this.tcpSocketConsumer.onConnectionClosed();
    }

    private void onConnected() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void logToStandardOutMessageReceived(@NotNull Wire wire) {
        Bytes bytes = wire.bytes();
        if (!Jvm.IS_DEBUG || !YamlLogging.clientReads) {
            return;
        }
        long position = bytes.writePosition();
        long limit = bytes.writeLimit();
        try {
            try {
                System.out.println("\nreceives:\n" + (wire instanceof TextWire ? "```yaml\n" + Wires.fromSizePrefixedBlobs((Bytes)bytes, (long)bytes.readPosition(), (long)bytes.readRemaining()) : "```\n" + BytesUtil.toHexString((Bytes)bytes, (long)bytes.readPosition(), (long)bytes.readRemaining())) + "```\n");
                YamlLogging.title = "";
                YamlLogging.writeMessage = "";
            }
            catch (Exception e) {
                String x = Bytes.toString((Bytes)bytes);
                LOG.error(x, (Throwable)e);
            }
        }
        finally {
            bytes.writeLimit(limit);
            bytes.writePosition(position);
        }
    }

    public void subscribe(@NotNull AsyncSubscription asyncSubscription) {
        this.tcpSocketConsumer.subscribe(asyncSubscription);
    }

    public void unsubscribe(long tid) {
        this.tcpSocketConsumer.unsubscribe(tid);
    }

    @NotNull
    public ReentrantLock outBytesLock() {
        return this.outBytesLock;
    }

    private synchronized void doHandShaking(SocketChannel socketChannel) throws IOException {
        SessionDetails sessionDetails = this.sessionDetails();
        this.handShakingWire.clear();
        this.handShakingWire.bytes().clear();
        this.handShakingWire.writeDocument(false, wireOut -> {
            if (sessionDetails == null) {
                wireOut.writeEventName((WireKey)SystemHandler.EventId.userid).text((CharSequence)System.getProperty("user.name"));
            } else {
                wireOut.writeEventName((WireKey)SystemHandler.EventId.userid).text((CharSequence)sessionDetails.userId());
            }
        });
        this.writeSocket((WireOut)this.handShakingWire, this.timeoutMs, socketChannel);
    }

    private SessionDetails sessionDetails() {
        return this.sessionProvider.get();
    }

    protected synchronized void closeSocket() {
        SocketChannel clientChannel = this.clientChannel;
        if (clientChannel != null) {
            try {
                clientChannel.socket().shutdownInput();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                clientChannel.socket().shutdownOutput();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                clientChannel.socket().close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                clientChannel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            this.clientChannel = null;
            this.onDisconnected();
        }
    }

    public boolean isOpen() {
        return this.clientChannel != null;
    }

    public void close() {
        this.tcpSocketConsumer.stop();
        System.out.println("closing " + this.remoteAddress + "");
        while (this.clientChannel != null) {
            Jvm.pause((long)10L);
            System.out.println("waiting for disconnect");
        }
    }

    public long nextUniqueTransaction(long time) {
        long old;
        long id = time;
        do {
            if ((old = this.transactionID.get()) != id) continue;
            id = old + 1L;
        } while (!this.transactionID.compareAndSet(old, id));
        return id;
    }

    public void writeSocket(@NotNull WireOut wire) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        SocketChannel clientChannel = this.clientChannel;
        if (clientChannel == null) {
            throw new IORuntimeException("Not Connected " + this.remoteAddress);
        }
        try {
            this.writeSocket(wire, this.timeoutMs, clientChannel);
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
            throw Jvm.rethrow((Throwable)e);
        }
    }

    public Wire proxyReply(long timeoutTime, long tid) {
        try {
            return this.tcpSocketConsumer.syncBlockingReadSocket(timeoutTime, tid);
        }
        catch (AssertionError | IORuntimeException e) {
            throw e;
        }
        catch (RuntimeException e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
            throw e;
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            this.closeSocket();
            throw Jvm.rethrow((Throwable)e);
        }
    }

    private void writeSocket(@NotNull WireOut outWire, long timeoutTime, @NotNull SocketChannel socketChannel) throws IOException {
        Bytes bytes = outWire.bytes();
        ByteBuffer outBuffer = (ByteBuffer)bytes.underlyingObject();
        outBuffer.limit((int)bytes.writePosition());
        outBuffer.position(0);
        if (Jvm.IS_DEBUG) {
            this.logToStandardOutMessageSent(outWire, outBuffer);
        }
        this.updateLargestChunkSoFarSize(outBuffer);
        while (outBuffer.remaining() > 0) {
            int len = socketChannel.write(outBuffer);
            if (len != -1) continue;
            throw new IORuntimeException("Disconnection to server " + this.description + "/" + TCPRegistry.lookup((String)this.description) + ",name=" + this.name);
        }
        outBuffer.clear();
        bytes.clear();
    }

    private void logToStandardOutMessageSent(@NotNull WireOut wire, @NotNull ByteBuffer outBuffer) {
        if (!YamlLogging.clientWrites) {
            return;
        }
        Bytes bytes = wire.bytes();
        try {
            System.out.println((!YamlLogging.title.isEmpty() ? "### " + YamlLogging.title + "\n" : "") + "" + YamlLogging.writeMessage + (YamlLogging.writeMessage.isEmpty() ? "" : "\n\n") + "sends:\n\n" + "```yaml\n" + (wire instanceof TextWire ? Wires.fromSizePrefixedBlobs((Bytes)bytes, (long)bytes.readPosition(), (long)bytes.readRemaining()) : BytesUtil.toHexString((Bytes)bytes, (long)bytes.readRemaining(), (long)bytes.readRemaining())) + "```");
            YamlLogging.title = "";
            YamlLogging.writeMessage = "";
        }
        catch (Exception e) {
            LOG.error(Bytes.toString((Bytes)bytes), (Throwable)e);
        }
    }

    private void updateLargestChunkSoFarSize(@NotNull ByteBuffer outBuffer) {
        int sizeOfThisChunk = (int)((long)outBuffer.limit() - this.limitOfLast);
        if (this.largestChunkSoFar < (long)sizeOfThisChunk) {
            this.largestChunkSoFar = sizeOfThisChunk;
        }
        this.limitOfLast = outBuffer.limit();
    }

    public Wire outWire() {
        assert (this.outBytesLock().isHeldByCurrentThread());
        return this.outWire;
    }

    private void reflectServerHeartbeatMessage(ValueIn valueIn) {
        long timestamp = valueIn.int64();
        this.lock(() -> {
            this.writeMetaDataForKnownTID(0L, this.outWire, null, 0L);
            this.outWire.writeDocument(false, w -> w.writeEventName((WireKey)SystemHandler.EventId.heartbeatReply).int64(timestamp));
            this.writeSocket((WireOut)this.outWire);
        });
    }

    public long writeMetaDataStartTime(long startTime, @NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        long tid = this.nextUniqueTransaction(startTime);
        this.writeMetaDataForKnownTID(tid, wire, csp, cid);
        return tid;
    }

    public void writeMetaDataForKnownTID(long tid, @NotNull Wire wire, @Nullable String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
            wireOut.writeEventName((WireKey)CoreFields.tid).int64(tid);
        });
    }

    public void writeAsyncHeader(@NotNull Wire wire, String csp, long cid) {
        assert (this.outBytesLock().isHeldByCurrentThread());
        wire.writeDocument(true, wireOut -> {
            if (cid == 0L) {
                wireOut.writeEventName((WireKey)CoreFields.csp).text((CharSequence)csp);
            } else {
                wireOut.writeEventName((WireKey)CoreFields.cid).int64(cid);
            }
        });
    }

    public void lock(@NotNull Task r) {
        if (this.clientChannel == null) {
            return;
        }
        this.outBytesLock().lock();
        try {
            r.run();
        }
        catch (Exception e) {
            LOG.debug("", (Throwable)e);
        }
        finally {
            this.outBytesLock().unlock();
        }
    }

    public void checkConnection() {
        long start = Time.currentTimeMillis();
        while (this.clientChannel == null) {
            if (this.tcpSocketConsumer.isShutdown()) {
                throw new IORuntimeException("Shutdown connection to" + this.remoteAddress);
            }
            if (start + this.timeoutMs > Time.currentTimeMillis()) {
                Jvm.pause((long)100L);
                continue;
            }
            throw new IORuntimeException("Not connected to " + this.remoteAddress);
        }
    }

    private class TcpSocketConsumer
    implements EventHandler {
        private final ExecutorService executorService;
        @NotNull
        private final Map<Long, Object> map = new ConcurrentHashMap<Long, Object>();
        private volatile boolean isShutdown;
        private Function<Bytes, Wire> wireFunction;
        private long tid;
        @NotNull
        private ThreadLocal<Wire> syncInWireThreadLocal = ThreadLocal.withInitial(() -> (Wire)TcpChannelHub.this.wire.apply(Bytes.elasticByteBuffer()));
        private Bytes serverHeartBeatHandler = Bytes.elasticByteBuffer();
        private volatile long lastTimeMessageReceived = Time.currentTimeMillis();
        long lastheartbeatSentTime = 0L;

        private void reconnect() {
            ReentrantLock lock = TcpChannelHub.this.outBytesLock();
            lock.lock();
            try {
                this.map.values().forEach(v -> {
                    if (v instanceof AsyncSubscription && !(v instanceof AsyncTemporarySubscription)) {
                        ((AsyncSubscription)v).applySubscribe();
                    }
                });
            }
            finally {
                lock.unlock();
            }
        }

        public void onConnectionClosed() {
            this.map.values().forEach(v -> {
                if (v instanceof AsyncSubscription) {
                    ((AsyncSubscription)v).onClose();
                } else if (v instanceof Bytes) {
                    Object object = v;
                    synchronized (object) {
                        v.notifyAll();
                    }
                }
            });
        }

        private TcpSocketConsumer(Function<Bytes, Wire> wireFunction) {
            this.wireFunction = wireFunction;
            if (LOG.isDebugEnabled()) {
                LOG.debug("constructor remoteAddress=" + TcpChannelHub.this.remoteAddress);
            }
            this.executorService = this.start();
        }

        public HandlerPriority priority() {
            return HandlerPriority.MONITOR;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Wire syncBlockingReadSocket(long timeoutTimeMs, long tid) throws InterruptedException, TimeoutException {
            long start = Time.currentTimeMillis();
            Wire wire = this.syncInWireThreadLocal.get();
            wire.clear();
            Bytes bytes = wire.bytes();
            ((ByteBuffer)bytes.underlyingObject()).clear();
            Bytes bytes2 = bytes;
            synchronized (bytes2) {
                this.map.put(tid, bytes);
                bytes.wait(timeoutTimeMs);
            }
            TcpChannelHub.logToStandardOutMessageReceived(wire);
            if (Time.currentTimeMillis() - start >= timeoutTimeMs) {
                throw new TimeoutException("timeoutTimeMs=" + timeoutTimeMs);
            }
            return wire;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void subscribe(@NotNull AsyncSubscription asyncSubscription) {
            TcpSocketConsumer tcpSocketConsumer = this;
            synchronized (tcpSocketConsumer) {
                if (TcpChannelHub.this.clientChannel == null) {
                    this.map.put(asyncSubscription.tid(), asyncSubscription);
                    System.out.println("deferred subscription tid=" + asyncSubscription.tid() + ",asyncSubscription=" + asyncSubscription);
                    return;
                }
            }
            ReentrantLock reentrantLock = TcpChannelHub.this.outBytesLock();
            reentrantLock.lock();
            try {
                this.map.put(asyncSubscription.tid(), asyncSubscription);
                asyncSubscription.applySubscribe();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                reentrantLock.unlock();
            }
        }

        public void unsubscribe(long tid) {
            this.map.remove(tid);
        }

        private ExecutorService start() {
            this.checkNotShutdown();
            ExecutorService executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("TcpChannelHub-" + TcpChannelHub.this.remoteAddress.getHostName() + ":" + TcpChannelHub.this.remoteAddress.getPort(), Boolean.valueOf(true)));
            this.isShutdown = false;
            executorService.submit(() -> {
                block3: {
                    try {
                        this.running();
                    }
                    catch (IORuntimeException e) {
                        LOG.debug("", (Throwable)e);
                    }
                    catch (Throwable e) {
                        if (this.isShutdown()) break block3;
                        LOG.error("", e);
                    }
                }
            });
            return executorService;
        }

        private void checkNotShutdown() {
            if (this.isShutdown) {
                throw new IllegalStateException("you can not call this method once stop() has been called.");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void running() {
            try {
                Wire inWire = this.wireFunction.apply(Bytes.elasticByteBuffer());
                assert (inWire != null);
                while (!this.isShutdown()) {
                    if (TcpChannelHub.this.clientChannel == null) {
                        this.checkConnectionState();
                    }
                    try {
                        Bytes bytes = inWire.bytes();
                        this.blockingRead((WireIn)inWire, 4L);
                        int header = bytes.readVolatileInt(0L);
                        long messageSize = this.size(header);
                        if (Wires.isData((long)header)) {
                            assert (messageSize < Integer.MAX_VALUE);
                            this.processData(this.tid, Wires.isReady((long)header), header, (int)messageSize, inWire);
                            continue;
                        }
                        this.blockingRead((WireIn)inWire, messageSize);
                        TcpChannelHub.logToStandardOutMessageReceived(inWire);
                        inWire.readDocument(w -> {
                            this.tid = CoreFields.tid(w);
                        }, null);
                    }
                    catch (IOException e) {
                        if (!this.isShutdown()) {
                            LOG.warn("reconnecting due to unexpected exception", (Throwable)e);
                            e.printStackTrace();
                            TcpChannelHub.this.closeSocket();
                            continue;
                        }
                        break;
                    }
                    finally {
                        this.clear(inWire);
                    }
                }
            }
            catch (Exception e) {
                if (!this.isShutdown()) {
                    e.printStackTrace();
                }
            }
            finally {
                System.out.println("Shuting down....");
                TcpChannelHub.this.closeSocket();
            }
        }

        private boolean isShutdown() {
            boolean interrupted = Thread.currentThread().isInterrupted();
            if (interrupted) {
                this.isShutdown = true;
            }
            return this.isShutdown;
        }

        private void clear(@NotNull Wire inWire) {
            inWire.clear();
            ((ByteBuffer)inWire.bytes().underlyingObject()).clear();
        }

        private long size(int header) {
            long messageSize = Wires.lengthOf((long)header);
            assert (messageSize > 0L) : "Invalid message size " + messageSize;
            assert (messageSize < 0x40000000L) : "Invalid message size " + messageSize;
            return messageSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processData(long tid, boolean isReady, int header, int messageSize, @NotNull Wire inWire) throws IOException {
            long startTime = 0L;
            Object o = null;
            while (!this.isShutdown()) {
                o = this.map.get(tid);
                if (isReady && (!(o instanceof AsyncSubscription) || o instanceof AsyncTemporarySubscription)) {
                    this.map.remove(tid);
                }
                if (o != null) break;
                if (startTime == 0L) {
                    startTime = Time.currentTimeMillis();
                }
                if (Time.currentTimeMillis() - startTime <= 3000L) continue;
                LOG.error("unable to respond to tid=" + tid + ", given that we have received a " + " message we a tid which is unknown, something has become corrupted, " + "so the safest thing to do is to drop the connection to the server and " + "start again.");
                return;
            }
            if (tid == 0L) {
                this.processServerSystemMessage(header, messageSize);
                return;
            }
            if (o instanceof AsyncSubscription) {
                this.blockingRead((WireIn)inWire, messageSize);
                TcpChannelHub.logToStandardOutMessageReceived(inWire);
                this.onMessageReceived();
                ((AsyncSubscription)o).onConsumer((WireIn)inWire);
            } else {
                Bytes bytes;
                Bytes bytes2 = bytes = (Bytes)o;
                synchronized (bytes2) {
                    bytes.clear();
                    bytes.ensureCapacity((long)(4 + messageSize));
                    ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
                    byteBuffer.clear();
                    bytes.writeInt(0L, header);
                    byteBuffer.position(4);
                    byteBuffer.limit(4 + messageSize);
                    this.readBuffer(byteBuffer);
                    this.onMessageReceived();
                    bytes.readLimit((long)byteBuffer.position());
                    bytes.notifyAll();
                }
            }
        }

        private void processServerSystemMessage(int header, int messageSize) throws IOException {
            this.serverHeartBeatHandler.clear();
            Bytes bytes = this.serverHeartBeatHandler;
            bytes.clear();
            ByteBuffer byteBuffer = (ByteBuffer)bytes.underlyingObject();
            byteBuffer.clear();
            bytes.writeInt(0L, header);
            byteBuffer.position(4);
            byteBuffer.limit(4 + messageSize);
            this.readBuffer(byteBuffer);
            bytes.readLimit((long)byteBuffer.position());
            StringBuilder eventName = Wires.acquireStringBuilder();
            ((Wire)TcpChannelHub.this.wire.apply(bytes)).readDocument(null, d -> {
                ValueIn valueIn = d.readEventName(eventName);
                if (SystemHandler.EventId.heartbeat.contentEquals(eventName)) {
                    TcpChannelHub.this.reflectServerHeartbeatMessage(valueIn);
                }
            });
        }

        private void blockingRead(@NotNull WireIn wire, long numberOfBytes) throws IOException {
            Bytes bytes = wire.bytes();
            bytes.ensureCapacity(bytes.writePosition() + numberOfBytes);
            ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
            int start = (int)bytes.writePosition();
            buffer.position(start);
            buffer.limit((int)((long)start + numberOfBytes));
            this.readBuffer(buffer);
            bytes.readLimit((long)buffer.position());
            this.onMessageReceived();
        }

        private void readBuffer(@NotNull ByteBuffer buffer) throws IOException {
            while (buffer.remaining() > 0) {
                SocketChannel clientChannel = TcpChannelHub.this.clientChannel;
                if (clientChannel == null) {
                    throw new IOException("Disconnection to server channel is closed" + TcpChannelHub.this.description + "/" + TCPRegistry.lookup((String)TcpChannelHub.this.description) + " ,name=" + TcpChannelHub.this.name);
                }
                if (clientChannel.read(buffer) == -1) {
                    throw new IOException("Disconnection to server read=-1 " + TcpChannelHub.this.description + "/" + TCPRegistry.lookup((String)TcpChannelHub.this.description) + " ,name=" + TcpChannelHub.this.name);
                }
                if (!this.isShutdown) continue;
                throw new IOException("The server was shutdown, " + TcpChannelHub.this.description + "/" + TCPRegistry.lookup((String)TcpChannelHub.this.description) + " ,name=" + TcpChannelHub.this.name);
            }
        }

        private void onMessageReceived() {
            this.lastTimeMessageReceived = Time.currentTimeMillis();
        }

        private void sendHeartbeat() {
            final long l = System.nanoTime();
            this.subscribe(new AbstractAsyncTemporarySubscription(TcpChannelHub.this, null, TcpChannelHub.this.name){

                @Override
                public void onSubscribe(WireOut wireOut) {
                    wireOut.writeEventName((WireKey)SystemHandler.EventId.heartbeat).int64(Time.currentTimeMillis());
                }

                @Override
                public void onConsumer(@NotNull WireIn inWire) {
                    long roundTipTimeMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - l);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(String.format("{0}:{1}heartbeat round trip time={2}us ,name=" + TcpChannelHub.this.name, TcpChannelHub.this.description, TCPRegistry.lookup((String)TcpChannelHub.this.description), roundTipTimeMicros));
                    }
                    inWire.clear();
                }
            });
        }

        private void stop() {
            this.isShutdown = true;
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                    this.executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                this.executorService.shutdownNow();
                LOG.error("", (Throwable)e);
            }
        }

        public boolean action() throws InvalidEventHandlerException {
            long x;
            if (TcpChannelHub.this.clientChannel == null) {
                throw new InvalidEventHandlerException();
            }
            long currentTime = Time.currentTimeMillis();
            long millisecondsSinceLastMessageReceived = currentTime - this.lastTimeMessageReceived;
            long millisecondsSinceLastHeatbeatSend = currentTime - this.lastheartbeatSentTime;
            if (millisecondsSinceLastMessageReceived >= (long)HEATBEAT_PING_PERIOD && millisecondsSinceLastHeatbeatSend >= (long)HEATBEAT_PING_PERIOD) {
                this.lastheartbeatSentTime = Time.currentTimeMillis();
                this.sendHeartbeat();
            }
            if ((x = millisecondsSinceLastMessageReceived - (long)HEATBEAT_TIMEOUT_PERIOD) > 0L) {
                LOG.warn("reconnecting due to heartbeat failure");
                TcpChannelHub.this.closeSocket();
                throw new InvalidEventHandlerException();
            }
            return true;
        }

        private void checkConnectionState() throws IOException {
            if (TcpChannelHub.this.clientChannel != null) {
                return;
            }
            this.attemptConnect();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void attemptConnect() throws IOException {
            block7: while (true) {
                if (this.isShutdown()) {
                    throw new IOException("shutdown..");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("attemptConnect remoteAddress=" + TcpChannelHub.this.remoteAddress);
                }
                try {
                    SocketChannel socketChannel;
                    while (true) {
                        if (this.isShutdown()) continue block7;
                        socketChannel = TcpChannelHub.openSocketChannel();
                        try {
                            if (socketChannel != null && socketChannel.connect(TcpChannelHub.this.remoteAddress)) break;
                            LOG.error("Connection refused to remoteAddress=" + TcpChannelHub.this.remoteAddress);
                            Jvm.pause((long)1000L);
                        }
                        catch (ConnectException e) {
                            LOG.error("Connection refused to remoteAddress=" + TcpChannelHub.this.remoteAddress);
                            Jvm.pause((long)1000L);
                        }
                    }
                    socketChannel.socket().setTcpNoDelay(true);
                    socketChannel.socket().setReceiveBufferSize(TcpChannelHub.this.tcpBufferSize);
                    socketChannel.socket().setSendBufferSize(TcpChannelHub.this.tcpBufferSize);
                    this.onMessageReceived();
                    TcpChannelHub.this.doHandShaking(socketChannel);
                    TcpSocketConsumer e = this;
                    synchronized (e) {
                        TcpChannelHub.this.clientChannel = socketChannel;
                    }
                    TcpChannelHub.this.eventLoop.addHandler((EventHandler)this);
                    if (LOG.isInfoEnabled()) {
                        LOG.info("successfully connected to remoteAddress=" + TcpChannelHub.this.remoteAddress);
                    }
                    this.reconnect();
                    TcpChannelHub.this.onConnected();
                }
                catch (Exception e) {
                    LOG.error("failed to connect remoteAddress=" + TcpChannelHub.this.remoteAddress + " so will reconnect");
                    e.printStackTrace();
                    TcpChannelHub.this.closeSocket();
                    continue;
                }
                break;
            }
        }
    }

    public static interface Task {
        public void run();
    }
}

