/*
 * Decompiled with CFR 0.152.
 */
package net.sf.eBus.client;

import java.io.IOException;
import java.net.ProtocolException;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.eBus.client.EAbstractConnection;
import net.sf.eBus.client.ERemoteApp;
import net.sf.eBus.config.EConfigure;
import net.sf.eBus.messages.EMessage;
import net.sf.eBus.messages.EMessageHeader;
import net.sf.eBus.messages.InvalidMessageException;
import net.sf.eBus.messages.UnknownMessageException;
import net.sf.eBus.messages.type.DataType;
import net.sf.eBus.net.AbstractAsyncSocket;
import net.sf.eBus.net.AsyncChannel;
import net.sf.eBus.net.AsyncSecureSocket;
import net.sf.eBus.net.AsyncSocket;
import net.sf.eBus.net.BufferWriter;
import net.sf.eBus.net.SocketListener;
import net.sf.eBus.util.HexDump;
import net.sf.eBus.util.TimerTask;

final class ETCPConnection
extends EAbstractConnection
implements SocketListener {
    public static final int ANY_PORT = 0;
    public static final long DEFAULT_RECONNECT_DELAY = 60000L;
    public static final long DEFAULT_HEARTBEAT_DELAY = 0L;
    public static final long DEFAULT_HEARTBEAT_REPLY_DELAY = 60000L;
    private static final Logger sLogger = Logger.getLogger(ETCPConnection.class.getName());
    private volatile boolean mReconnectFlag = false;
    private final AtomicLong mReconnectDelay = new AtomicLong(60000L);
    private volatile TimerTask mReconnectTimer = null;
    private final AtomicLong mHeartbeatDelay = new AtomicLong(0L);
    private volatile TimerTask mHeartbeatTimer = null;
    private volatile boolean mHeartbeatReplyFlag = false;
    private final AtomicLong mHeartbeatReplyDelay = new AtomicLong(60000L);
    private volatile TimerTask mHeartbeatReplyTimer = null;
    private volatile ConnectState mState = ConnectState.CLOSED;

    private ETCPConnection(ERemoteApp remoteApp) {
        super(remoteApp);
    }

    @Override
    public boolean willReconnect() {
        return this.mReconnectFlag;
    }

    @Override
    public boolean willPause() {
        return true;
    }

    @Override
    boolean isConnecting() {
        return this.mState == ConnectState.CONNECTING;
    }

    @Override
    void close() {
        if (this.mState != ConnectState.CLOSING && this.mState != ConnectState.CLOSED) {
            this.closeConnection();
            if (this.mOutputWriter.hasMessages()) {
                this.mState = ConnectState.CLOSING;
                this.mOutputWriter.setClosing();
            } else {
                this.mState = ConnectState.CLOSED;
                if (this.mAsocket.isOpen()) {
                    if (sLogger.isLoggable(Level.FINE)) {
                        sLogger.fine(String.format("%s: closing connection.", this.mAsocket.remoteSocketAddress()));
                    }
                    this.mAsocket.close();
                }
            }
        }
    }

    @Override
    void closeNow() {
        if (this.mState != ConnectState.CLOSED) {
            this.closeConnection();
            if (this.mAsocket.isOpen()) {
                if (sLogger.isLoggable(Level.FINE)) {
                    sLogger.fine(String.format("%s: closing connection now.", this.mAsocket.remoteSocketAddress()));
                }
                this.mAsocket.closeNow();
                this.mOutputWriter.closed();
            }
        }
    }

    @Override
    boolean open(EConfigure.RemoteConnection config) throws IOException {
        boolean retcode = false;
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine(String.format("%s: connecting.", config.address()));
        }
        try {
            this.mState = ConnectState.CONNECTING;
            retcode = ((AbstractAsyncSocket)this.mAsocket).open((SocketAddress)config.address(), config.bindPort());
            this.mBindPort = config.bindPort();
            this.mReconnectFlag = config.reconnectFlag();
            this.mReconnectDelay.set(config.reconnectTime());
            this.mHeartbeatDelay.set(config.heartbeatDelay());
            this.mHeartbeatReplyDelay.set(config.heartbeatReplyDelay());
            if (retcode) {
                this.mState = ConnectState.CONNECTED;
                this.startHeartbeatTimer();
            }
        }
        catch (IOException ioex) {
            sLogger.log(Level.WARNING, String.format("%s: open failed.", config.address()), ioex);
            if (!config.reconnectFlag()) {
                throw ioex;
            }
            this.mBindPort = config.bindPort();
            this.mReconnectFlag = config.reconnectFlag();
            this.startReconnectTimer();
        }
        return retcode;
    }

    @Override
    void open(SelectableChannel socket, EConfigure.Service config) throws IOException {
        Objects.requireNonNull(socket, "socket is null");
        this.mState = ConnectState.CONNECTED;
        this.mReconnectFlag = false;
        this.mReconnectDelay.set(0L);
        this.mHeartbeatDelay.set(config.heartbeatDelay());
        this.mHeartbeatReplyDelay.set(config.heartbeatReplyDelay());
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine(String.format("%s: encapsulating socket.", this.mAsocket.remoteSocketAddress()));
        }
        ((AbstractAsyncSocket)this.mAsocket).open(socket);
        if (config.connectionType() == EConfigure.ConnectionType.TCP) {
            this.startHeartbeatTimer();
        }
    }

    @Override
    void send(EMessageHeader header) throws IOException {
        if (header == null) {
            throw new IllegalArgumentException("null header");
        }
        if (this.mState != ConnectState.CONNECTED) {
            throw new IllegalStateException("not connected");
        }
        if (sLogger.isLoggable(Level.FINEST)) {
            sLogger.finest(String.format("%s: sending message to remote eBus: from ID=%d, to ID=%d%n%s", this.mAsocket.remoteSocketAddress(), header.fromFeedId(), header.toFeedId(), header.message()));
        } else if (sLogger.isLoggable(Level.FINER)) {
            sLogger.finer(String.format("%s: sending %s message to remote eBus: from ID=%d, to ID=%d.", this.mAsocket.remoteSocketAddress(), header.messageClass(), header.fromFeedId(), header.toFeedId()));
        }
        if (this.mOutputWriter.post(header)) {
            try {
                ((AbstractAsyncSocket)this.mAsocket).send((BufferWriter)this.mOutputWriter);
                ++this.mMsgOutCount;
                ++sTotalOutCount;
            }
            catch (BufferOverflowException bufferOverflowException) {
                // empty catch block
            }
        }
    }

    @Override
    void closeAndReconnect() {
        if (this.mAsocket.isOpen()) {
            this.stopHeartbeatTimer();
            this.stopReconnectTimer();
            if (sLogger.isLoggable(Level.FINE)) {
                sLogger.fine(String.format("%s: reconnecting after %,d milliseconds.", this.mAsocket.remoteSocketAddress(), this.mReconnectDelay.get()));
            }
            this.mAsocket.closeNow();
            if (this.mReconnectFlag) {
                this.mState = ConnectState.RECONNECTING;
                this.startReconnectTimer();
            } else {
                this.mState = ConnectState.CLOSED;
            }
        }
    }

    @Override
    void closeAndPause(Duration delay) {
        if (this.mAsocket.isOpen()) {
            this.stopHeartbeatTimer();
            this.stopReconnectTimer();
            if (sLogger.isLoggable(Level.FINE)) {
                sLogger.fine(String.format("%s: pausing connection for %,d milliseconds.", this.mAsocket.remoteSocketAddress(), delay.toMillis()));
            }
            this.mAsocket.closeNow();
            this.mState = ConnectState.RECONNECTING;
            this.startPauseTimer(delay);
        }
    }

    @Override
    void resumeNow() {
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine(String.format("%s: resuming connection now.", this.mAsocket.remoteSocketAddress()));
        }
        this.stopReconnectTimer();
        this.reconnect();
    }

    public final void handleOpen(AbstractAsyncSocket socket) {
        this.connected();
    }

    public void handleInput(ByteBuffer buffer, AbstractAsyncSocket asocket) {
        if (this.mState == ConnectState.CONNECTED) {
            this.processInput(buffer);
        } else {
            int position = buffer.position();
            int remaining = buffer.remaining();
            buffer.position(position + remaining);
        }
    }

    public final void handleOutputAvailable(AbstractAsyncSocket s) {
        try {
            s.send((BufferWriter)this.mOutputWriter);
        }
        catch (BufferOverflowException bufferOverflowException) {
        }
        catch (IOException ioex) {
            sLogger.log(Level.WARNING, String.format("%s: message transmit failed.", this.mAsocket.remoteSocketAddress()), ioex);
        }
    }

    public final void handleClose(Throwable t, AbstractAsyncSocket asocket) {
        this.disconnected(t);
    }

    private void heartbeatTimeout(TimerTask task) {
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.finer(String.format("%s: heartbeat timer expired.", this.mAsocket.remoteSocketAddress()));
        }
        this.mHeartbeatTimer = null;
        this.mHeartbeatReplyFlag = true;
        try {
            if (sLogger.isLoggable(Level.FINEST)) {
                sLogger.finest(String.format("%s: sending heartbeat.", this.mAsocket.remoteSocketAddress()));
            }
            ((AbstractAsyncSocket)this.mAsocket).send(HEARTBEAT_DATA, 0, HEARTBEAT_DATA.length);
            this.startHeartbeatTimer();
        }
        catch (IOException ioex) {
            sLogger.log(Level.WARNING, String.format("%s: heartbeat send failed.", this.mAsocket.remoteSocketAddress()), ioex);
            this.disconnected(ioex);
        }
    }

    private void heartbeatReplyTimeout(TimerTask task) {
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine(String.format("%s: heartbeat reply timer expired.", this.mAsocket.remoteSocketAddress()));
        }
        this.mHeartbeatReplyTimer = null;
        this.mHeartbeatReplyFlag = false;
        this.mAsocket.closeNow();
        this.disconnected(new IOException("no heartbeat reply"));
    }

    private void reconnectTimeout(TimerTask task) {
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine(String.format("%s: reconnect timer expired.", this.mAsocket.remoteSocketAddress()));
        }
        this.mReconnectTimer = null;
        this.reconnect();
    }

    public long reconnectDelay() {
        return this.mReconnectDelay.get();
    }

    public long heartbeatDelay() {
        return this.mHeartbeatDelay.get();
    }

    public long heartbeatReplyDelay() {
        return this.mHeartbeatReplyDelay.get();
    }

    static ETCPConnection create(EConfigure.RemoteConnection config, ERemoteApp remoteApp) {
        ETCPConnection retval = new ETCPConnection(remoteApp);
        retval.initialize(config);
        return retval;
    }

    static ETCPConnection create(EConfigure.Service config, ERemoteApp remoteApp) {
        ETCPConnection retval = new ETCPConnection(remoteApp);
        retval.initialize(config);
        return retval;
    }

    private void startReconnectTimer() {
        if (!this.mReconnectFlag || this.mState == ConnectState.CLOSING) {
            this.mState = ConnectState.CLOSED;
        } else {
            long delay = this.mReconnectDelay.get();
            this.mState = ConnectState.RECONNECTING;
            if (this.mReconnectTimer != null) {
                this.mReconnectTimer.cancel();
                this.mReconnectTimer = null;
            }
            this.mReconnectTimer = new TimerTask(task -> this.reconnectTimeout(this.mReconnectTimer));
            sTimer.schedule((java.util.TimerTask)this.mReconnectTimer, delay);
            if (sLogger.isLoggable(Level.FINEST)) {
                sLogger.finest(String.format("%s: started reconnect timer, delay %,d millis.", this.mAsocket.remoteSocketAddress(), delay));
            }
        }
    }

    private void startPauseTimer(Duration delay) {
        this.mState = ConnectState.RECONNECTING;
        if (this.mReconnectTimer != null) {
            this.mReconnectTimer.cancel();
            this.mReconnectTimer = null;
        }
        this.mReconnectTimer = new TimerTask(task -> this.reconnectTimeout(this.mReconnectTimer));
        sTimer.schedule((java.util.TimerTask)this.mReconnectTimer, delay.toMillis());
        if (sLogger.isLoggable(Level.FINEST)) {
            sLogger.finest(String.format("%s: started pause timer, delay %,d millis.", this.mAsocket.remoteSocketAddress(), delay.toMillis()));
        }
    }

    private void stopReconnectTimer() {
        if (this.mReconnectTimer != null) {
            this.mReconnectTimer.cancel();
            this.mReconnectTimer = null;
            if (sLogger.isLoggable(Level.FINEST)) {
                sLogger.finest(String.format("%s: reconnect timer stopped.", this.mAsocket.remoteSocketAddress()));
            }
        }
    }

    private void connected() {
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine(String.format("%s: connected.", this.mAsocket.remoteSocketAddress()));
        }
        try {
            this.mAsocket.setOption(StandardSocketOptions.TCP_NODELAY, (Object)true);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        this.mState = ConnectState.CONNECTED;
        this.startHeartbeatTimer();
        try {
            ETCPConnection abConn = this;
            CONN_CB[0].invokeExact(this.mRemoteApp, abConn);
        }
        catch (Throwable tex) {
            sLogger.log(Level.WARNING, CONN_CB_METHOD_NAMES[0] + "exception", tex);
        }
    }

    private void disconnected(Throwable t) {
        if (sLogger.isLoggable(Level.FINER)) {
            sLogger.log(Level.FINER, String.format("%s: closed.", this.mAsocket.remoteSocketAddress()), t);
        }
        this.stopReconnectTimer();
        this.stopHeartbeatTimer();
        try {
            ETCPConnection abConn = this;
            CONN_CB[1].invokeExact(this.mRemoteApp, abConn);
        }
        catch (Throwable tex) {
            sLogger.log(Level.WARNING, CONN_CB_METHOD_NAMES[1] + "exception", tex);
        }
        this.startReconnectTimer();
    }

    private void errorCallback(Throwable t) {
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.log(Level.FINE, String.format("%s: input processing error", this.mAsocket.remoteSocketAddress()), t);
        }
    }

    private void reconnect() {
        if (this.mState != ConnectState.CLOSING && this.mState != ConnectState.CLOSED) {
            SocketAddress address = this.mAsocket.remoteSocketAddress();
            if (sLogger.isLoggable(Level.FINE)) {
                sLogger.fine(String.format("%s: attempting to reconnect.", address));
            }
            try {
                if (((AbstractAsyncSocket)this.mAsocket).open(address, this.mBindPort)) {
                    this.connected();
                } else {
                    this.mState = ConnectState.CONNECTING;
                }
            }
            catch (IOException ioex) {
                if (sLogger.isLoggable(Level.FINE)) {
                    sLogger.log(Level.FINE, String.format("%s: connect attempt failed.", address), ioex);
                }
                this.startReconnectTimer();
            }
        }
    }

    private void closeConnection() {
        this.stopHeartbeatTimer();
        this.mReconnectFlag = false;
        this.stopReconnectTimer();
    }

    private void startHeartbeatTimer() {
        long delay = this.mHeartbeatDelay.get();
        if (delay > 0L && !this.mHeartbeatReplyFlag && this.mHeartbeatTimer == null) {
            this.mHeartbeatTimer = new TimerTask(task -> this.heartbeatTimeout(this.mHeartbeatTimer));
            sTimer.schedule((java.util.TimerTask)this.mHeartbeatTimer, delay);
            if (sLogger.isLoggable(Level.FINEST)) {
                sLogger.finest(String.format("%s: started heartbeat timer, delay %,d millis.", this.mAsocket.remoteSocketAddress(), delay));
            }
        } else if (delay > 0L && this.mHeartbeatReplyFlag && this.mHeartbeatReplyTimer == null) {
            delay = this.mHeartbeatReplyDelay.get();
            this.mHeartbeatReplyTimer = new TimerTask(task -> this.heartbeatReplyTimeout(this.mHeartbeatReplyTimer));
            sTimer.schedule((java.util.TimerTask)this.mHeartbeatReplyTimer, delay);
            if (sLogger.isLoggable(Level.FINEST)) {
                sLogger.finest(String.format("%s: started heartbeat reply timer, delay %,d millis.", this.mAsocket.remoteSocketAddress(), delay));
            }
        }
    }

    private void stopHeartbeatTimer() {
        if (this.mHeartbeatTimer != null) {
            this.mHeartbeatTimer.cancel();
        }
        if (this.mHeartbeatReplyTimer != null) {
            this.mHeartbeatReplyTimer.cancel();
            this.mHeartbeatReplyTimer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processInput(ByteBuffer buffer) {
        int messageSize = -1;
        boolean hbReplyFlag = false;
        int keyId = Integer.MIN_VALUE;
        this.stopHeartbeatTimer();
        this.mHeartbeatReplyFlag = false;
        if (sLogger.isLoggable(Level.FINEST)) {
            int rem = buffer.remaining();
            int pos = buffer.position();
            int ms = buffer.getInt(pos);
            byte[] data = new byte[rem];
            buffer.mark();
            buffer.get(data);
            buffer.reset();
            sLogger.finest(String.format("%s: %,d bytes available (start=%,d, msg size=%,d):%n%s", this.mAsocket.remoteSocketAddress(), rem, pos, ms, HexDump.dump((byte[])data, (int)0, (int)rem, (String)"  ")));
        } else if (sLogger.isLoggable(Level.FINER)) {
            sLogger.finer(String.format("%s: %,d bytes available.", this.mAsocket.remoteSocketAddress(), buffer.remaining()));
        }
        int startPosition = buffer.position();
        int remaining = buffer.remaining();
        buffer.mark();
        while (remaining >= 4 && (messageSize = buffer.getInt()) <= remaining) {
            if (sLogger.isLoggable(Level.FINEST)) {
                sLogger.finest(String.format("%s: %,d bytes remaining, position is %,d, message size is %,d bytes.", this.mAsocket.remoteSocketAddress(), remaining, startPosition, messageSize));
            }
            if (messageSize == -15000) {
                hbReplyFlag = true;
            } else if (messageSize != -8000) {
                if (messageSize < 16 || messageSize > Short.MAX_VALUE) {
                    this.errorCallback(new ProtocolException("invalid message size - " + Integer.toString(messageSize)));
                    this.mAsocket.closeNow();
                    this.disconnected(new IOException("invalid message size"));
                } else {
                    try {
                        keyId = buffer.getInt();
                        EAbstractConnection.MessageReader reader = (EAbstractConnection.MessageReader)this.mInputReaders.get(keyId);
                        EMessageHeader header = reader.extractMessage(buffer, this.mAsocket.remoteSocketAddress());
                        reader.forwardMessage(header, this.mRemoteApp);
                    }
                    catch (NullPointerException nullex) {
                        sLogger.log(Level.WARNING, "received unsupported key ID " + keyId, nullex);
                    }
                    catch (BufferUnderflowException | InvalidMessageException | UnknownMessageException jex) {
                        this.errorCallback(jex);
                    }
                    finally {
                        buffer.position(startPosition += messageSize);
                    }
                }
            }
            startPosition = buffer.position();
            remaining = buffer.remaining();
            buffer.mark();
        }
        buffer.reset();
        if (hbReplyFlag) {
            try {
                if (sLogger.isLoggable(Level.FINEST)) {
                    sLogger.finest(String.format("%s: sending heartbeat reply.", this.mAsocket.remoteSocketAddress()));
                }
                ((AbstractAsyncSocket)this.mAsocket).send(HEARTBEAT_REPLY_DATA, 0, HEARTBEAT_REPLY_DATA.length);
            }
            catch (IOException | BufferOverflowException exception) {
                // empty catch block
            }
        }
        this.startHeartbeatTimer();
    }

    private void outboundQueueEmpty() {
        if (this.mState == ConnectState.CLOSING) {
            this.mState = ConnectState.CLOSED;
            if (this.mAsocket.isOpen()) {
                if (sLogger.isLoggable(Level.FINE)) {
                    sLogger.fine(String.format("%s: closing connection.", this.mAsocket.remoteSocketAddress()));
                }
                this.mAsocket.close();
            }
        }
    }

    private void initialize(EConfigure.RemoteConnection config) {
        switch (config.connectionType()) {
            case TCP: {
                this.mAsocket = this.createPlainTextTCP(config);
                break;
            }
            case SECURE_TCP: {
                this.mAsocket = this.createSecureTCP(config);
            }
        }
        this.mOutputWriter = new MessageWriter(config.messageQueueSize(), this);
        super.initialize();
    }

    private AsyncChannel createPlainTextTCP(EConfigure.RemoteConnection config) {
        AsyncSocket.SocketBuilder builder = AsyncSocket.builder();
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine("Creating plain text TCP connection.");
        }
        return builder.inputBufferSize(config.inputBufferSize()).outputBufferSize(config.outputBufferSize()).byteOrder(config.byteOrder()).selector(config.selector()).listener((SocketListener)this).build();
    }

    private AsyncChannel createSecureTCP(EConfigure.RemoteConnection config) {
        AsyncSecureSocket.SecureSocketBuilder builder = AsyncSecureSocket.builder();
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine("Creating secure TCP connection.");
        }
        return builder.sslContext(config.sslContext()).inputBufferSize(config.inputBufferSize()).outputBufferSize(config.outputBufferSize()).byteOrder(config.byteOrder()).selector(config.selector()).listener((SocketListener)this).build();
    }

    private void initialize(EConfigure.Service config) {
        switch (config.connectionType()) {
            case TCP: {
                this.mAsocket = this.createPlainTextTCP(config);
                break;
            }
            case SECURE_TCP: {
                this.mAsocket = this.createSecureTCP(config);
            }
        }
        this.mOutputWriter = new MessageWriter(config.messageQueueSize(), this);
        super.initialize();
    }

    private AsyncChannel createPlainTextTCP(EConfigure.Service config) {
        AsyncSocket.SocketBuilder builder = AsyncSocket.builder();
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine("Creating plain text TCP connection.");
        }
        return builder.inputBufferSize(config.inputBufferSize()).outputBufferSize(config.outputBufferSize()).byteOrder(config.byteOrder()).selector(config.connectionSelector()).listener((SocketListener)this).build();
    }

    private AsyncChannel createSecureTCP(EConfigure.Service config) {
        AsyncSecureSocket.SecureSocketBuilder builder = AsyncSecureSocket.builder();
        if (sLogger.isLoggable(Level.FINE)) {
            sLogger.fine("Creating secure TCP connection.");
        }
        return builder.sslContext(config.sslContext()).inputBufferSize(config.inputBufferSize()).outputBufferSize(config.outputBufferSize()).byteOrder(config.byteOrder()).selector(config.connectionSelector()).listener((SocketListener)this).build();
    }

    static final class DisconnectEvent {
        private final Throwable _t;

        private DisconnectEvent(Throwable t) {
            this._t = t;
        }

        Throwable exception() {
            return this._t;
        }
    }

    static final class ConnectEvent {
    }

    private static final class MessageWriter
    extends EAbstractConnection.AbstractMessageWriter
    implements BufferWriter {
        private static final Logger _sublogger = Logger.getLogger(MessageWriter.class.getName());

        public MessageWriter(int maxSize, ETCPConnection connection) {
            super(maxSize, connection);
        }

        public void fill(ByteBuffer buffer) throws BufferOverflowException {
            if (_sublogger.isLoggable(Level.FINEST)) {
                _sublogger.finest(String.format("%s queue: sending messages (size=%,d, remaining=%,d).", this._connection.remoteSocketAddress(), this._transmitQueueSize.get(), buffer.remaining()));
            }
            while (!this._transmitQueue.isEmpty()) {
                if (buffer.remaining() < 16) {
                    throw new BufferOverflowException();
                }
                EMessageHeader header = (EMessageHeader)this._transmitQueue.peek();
                DataType msgType = DataType.findType(header.messageClass());
                buffer.mark();
                int sizePosition = buffer.position();
                buffer.position(sizePosition + 4);
                buffer.putInt(header.classId()).putInt(header.fromFeedId()).putInt(header.toFeedId());
                msgType.serialize(header.message(), buffer);
                buffer.putInt(sizePosition, buffer.position() - sizePosition);
                this._transmitQueue.poll();
                int queueSize = header.messageType() == EMessage.MessageType.SYSTEM ? this._transmitQueueSize.get() : this._transmitQueueSize.decrementAndGet();
                ++this._transmitCount;
                if (!_sublogger.isLoggable(Level.FINEST)) continue;
                _sublogger.finest(String.format("%s: queued message sent (size=%,d, transmited=%,d, discarded=%,d).", this._connection.remoteSocketAddress(), queueSize, this._transmitCount, this._discardCount));
            }
            if (this._closingFlag) {
                ((ETCPConnection)this._connection).outboundQueueEmpty();
            }
        }
    }

    private static enum ConnectState {
        CLOSED,
        CONNECTING,
        CONNECTED,
        CLOSING,
        RECONNECTING;

    }
}

