/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ClientCnxnLimitException;
import org.apache.zookeeper.server.NettyServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.ZooKeeperSaslServer;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.command.CommandExecutor;
import org.apache.zookeeper.server.command.FourLetterCommands;
import org.apache.zookeeper.server.command.NopCommand;
import org.apache.zookeeper.server.command.SetTraceMaskCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerCnxn
extends ServerCnxn {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
    private final Channel channel;
    private CompositeByteBuf queuedBuffer;
    private final AtomicBoolean throttled = new AtomicBoolean(false);
    private ByteBuffer bb;
    private final ByteBuffer bbLen = ByteBuffer.allocate(4);
    private long sessionId;
    private int sessionTimeout;
    private Certificate[] clientChain;
    private volatile boolean closingChannel;
    private final NettyServerCnxnFactory factory;
    private boolean initialized;
    public int readIssuedAfterReadComplete;
    private volatile HandshakeState handshakeState = HandshakeState.NONE;
    private final GenericFutureListener<Future<Void>> onSendBufferDoneListener = f -> {
        if (f.isSuccess()) {
            this.packetSent();
        }
    };

    NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
        super(zks);
        this.channel = channel;
        this.closingChannel = false;
        this.factory = factory;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
        }
        InetAddress addr = ((InetSocketAddress)channel.remoteAddress()).getAddress();
        this.addAuthInfo(new Id("ip", addr.getHostAddress()));
    }

    @Override
    public void close(ServerCnxn.DisconnectReason reason) {
        this.disconnectReason = reason;
        this.close();
    }

    public void close() {
        this.closingChannel = true;
        LOG.debug("close called for session id: 0x{}", (Object)Long.toHexString(this.sessionId));
        this.setStale();
        this.factory.unregisterConnection(this);
        if (!this.factory.cnxns.remove(this)) {
            LOG.debug("cnxns size:{}", (Object)this.factory.cnxns.size());
            if (this.channel.isOpen()) {
                this.channel.close();
            }
            return;
        }
        LOG.debug("close in progress for session id: 0x{}", (Object)Long.toHexString(this.sessionId));
        this.factory.removeCnxnFromSessionMap(this);
        this.factory.removeCnxnFromIpMap(this, ((InetSocketAddress)this.channel.remoteAddress()).getAddress());
        if (this.zkServer != null) {
            this.zkServer.removeCnxn(this);
        }
        if (this.channel.isOpen()) {
            this.channel.writeAndFlush((Object)Unpooled.EMPTY_BUFFER).addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) {
                    future.channel().close().addListener(f -> NettyServerCnxn.this.releaseQueuedBuffer());
                }
            });
        } else {
            ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1L);
            this.channel.eventLoop().execute(this::releaseQueuedBuffer);
        }
    }

    @Override
    public long getSessionId() {
        return this.sessionId;
    }

    @Override
    public int getSessionTimeout() {
        return this.sessionTimeout;
    }

    @Override
    public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
        }
        WatcherEvent e = event.getWrapper();
        try {
            this.sendResponse(h, e, "notification");
        }
        catch (IOException e1) {
            LOG.debug("Problem sending to {}", (Object)this.getRemoteSocketAddress(), (Object)e1);
            this.close();
        }
    }

    @Override
    public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) throws IOException {
        if (this.closingChannel || !this.channel.isOpen()) {
            return;
        }
        this.sendBuffer(this.serialize(h, r, tag, cacheKey, stat, opCode));
        this.decrOutstandingAndCheckThrottle(h);
    }

    @Override
    public void setSessionId(long sessionId) {
        this.sessionId = sessionId;
        this.factory.addSession(sessionId, this);
    }

    @Override
    public void sendBuffer(ByteBuffer ... buffers) {
        if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
            this.close(ServerCnxn.DisconnectReason.CLIENT_CLOSED_CONNECTION);
            return;
        }
        this.channel.writeAndFlush((Object)Unpooled.wrappedBuffer((ByteBuffer[])buffers)).addListener(this.onSendBufferDoneListener);
    }

    private boolean checkFourLetterWord(Channel channel, ByteBuf message, int len) {
        if (!FourLetterCommands.isKnown(len)) {
            return false;
        }
        String cmd = FourLetterCommands.getCommandString(len);
        channel.config().setAutoRead(false);
        this.packetReceived(4L);
        PrintWriter pwriter = new PrintWriter(new BufferedWriter(new SendBufferWriter()));
        if (!FourLetterCommands.isEnabled(cmd)) {
            LOG.debug("Command {} is not executed because it is not in the whitelist.", (Object)cmd);
            NopCommand nopCmd = new NopCommand(pwriter, this, cmd + " is not executed because it is not in the whitelist.");
            nopCmd.start();
            return true;
        }
        LOG.info("Processing {} command from {}", (Object)cmd, (Object)channel.remoteAddress());
        if (len == FourLetterCommands.setTraceMaskCmd) {
            ByteBuffer mask = ByteBuffer.allocate(8);
            message.readBytes(mask);
            mask.flip();
            long traceMask = mask.getLong();
            ZooTrace.setTextTraceLevel(traceMask);
            SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, this, traceMask);
            setMask.start();
            return true;
        }
        CommandExecutor commandExecutor = new CommandExecutor();
        return commandExecutor.execute(this, pwriter, len, this.zkServer, this.factory);
    }

    private void checkIsInEventLoop(String callerMethodName) {
        if (!this.channel.eventLoop().inEventLoop()) {
            throw new IllegalStateException(callerMethodName + "() called from non-EventLoop thread");
        }
    }

    private void appendToQueuedBuffer(ByteBuf buf) {
        this.checkIsInEventLoop("appendToQueuedBuffer");
        if (this.queuedBuffer.numComponents() == this.queuedBuffer.maxNumComponents()) {
            this.queuedBuffer.consolidate();
        }
        this.queuedBuffer.addComponent(true, buf);
        ServerMetrics.getMetrics().NETTY_QUEUED_BUFFER.add(this.queuedBuffer.capacity());
    }

    void processMessage(ByteBuf buf) {
        this.checkIsInEventLoop("processMessage");
        LOG.debug("0x{} queuedBuffer: {}", (Object)Long.toHexString(this.sessionId), (Object)this.queuedBuffer);
        if (LOG.isTraceEnabled()) {
            LOG.trace("0x{} buf {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)buf));
        }
        if (this.throttled.get()) {
            LOG.debug("Received message while throttled");
            if (this.queuedBuffer == null) {
                LOG.debug("allocating queue");
                this.queuedBuffer = this.channel.alloc().compositeBuffer();
            }
            this.appendToQueuedBuffer(buf.retainedDuplicate());
            if (LOG.isTraceEnabled()) {
                LOG.trace("0x{} queuedBuffer {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)this.queuedBuffer));
            }
        } else {
            LOG.debug("not throttled");
            if (this.queuedBuffer != null) {
                this.appendToQueuedBuffer(buf.retainedDuplicate());
                this.processQueuedBuffer();
            } else {
                this.receiveMessage(buf);
                if (!this.closingChannel && buf.isReadable()) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Before copy {}", (Object)buf);
                    }
                    if (this.queuedBuffer == null) {
                        this.queuedBuffer = this.channel.alloc().compositeBuffer();
                    }
                    this.appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Copy is {}", (Object)this.queuedBuffer);
                        LOG.trace("0x{} queuedBuffer {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)this.queuedBuffer));
                    }
                }
            }
        }
    }

    void processQueuedBuffer() {
        this.checkIsInEventLoop("processQueuedBuffer");
        if (this.queuedBuffer != null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("processing queue 0x{} queuedBuffer {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)this.queuedBuffer));
            }
            this.receiveMessage((ByteBuf)this.queuedBuffer);
            if (this.closingChannel) {
                LOG.debug("Processed queue - channel closed, dropping remaining bytes");
            } else if (!this.queuedBuffer.isReadable()) {
                LOG.debug("Processed queue - no bytes remaining");
                this.releaseQueuedBuffer();
            } else {
                LOG.debug("Processed queue - bytes remaining");
                this.queuedBuffer.discardReadComponents();
            }
        } else {
            LOG.debug("queue empty");
        }
    }

    private void releaseQueuedBuffer() {
        this.checkIsInEventLoop("releaseQueuedBuffer");
        if (this.queuedBuffer != null) {
            this.queuedBuffer.release();
            this.queuedBuffer = null;
        }
    }

    private void receiveMessage(ByteBuf message) {
        this.checkIsInEventLoop("receiveMessage");
        try {
            while (message.isReadable() && !this.throttled.get()) {
                if (this.bb != null) {
                    ByteBuffer dat;
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bb len {} {}", new Object[]{message.readableBytes(), this.bb.remaining(), this.bb});
                        dat = this.bb.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bb {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)dat)));
                    }
                    if (this.bb.remaining() > message.readableBytes()) {
                        int newLimit = this.bb.position() + message.readableBytes();
                        this.bb.limit(newLimit);
                    }
                    message.readBytes(this.bb);
                    this.bb.limit(this.bb.capacity());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable {} bb len {} {}", new Object[]{message.readableBytes(), this.bb.remaining(), this.bb});
                        dat = this.bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes 0x{} bb {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)dat)));
                    }
                    if (this.bb.remaining() != 0) continue;
                    this.bb.flip();
                    this.packetReceived(4 + this.bb.remaining());
                    ZooKeeperServer zks = this.zkServer;
                    if (zks == null || !zks.isRunning()) {
                        throw new IOException("ZK down");
                    }
                    if (this.initialized) {
                        zks.processPacket(this, this.bb);
                    } else {
                        LOG.debug("got conn req request from {}", (Object)this.getRemoteSocketAddress());
                        zks.processConnectRequest(this, this.bb);
                        this.initialized = true;
                    }
                    this.bb = null;
                    continue;
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message readable {} bblenrem {}", (Object)message.readableBytes(), (Object)this.bbLen.remaining());
                    ByteBuffer dat = this.bbLen.duplicate();
                    dat.flip();
                    LOG.trace("0x{} bbLen {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)dat)));
                }
                if (message.readableBytes() < this.bbLen.remaining()) {
                    this.bbLen.limit(this.bbLen.position() + message.readableBytes());
                }
                message.readBytes(this.bbLen);
                this.bbLen.limit(this.bbLen.capacity());
                if (this.bbLen.remaining() != 0) continue;
                this.bbLen.flip();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("0x{} bbLen {}", (Object)Long.toHexString(this.sessionId), (Object)ByteBufUtil.hexDump((ByteBuf)Unpooled.wrappedBuffer((ByteBuffer)this.bbLen)));
                }
                int len = this.bbLen.getInt();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("0x{} bbLen len is {}", (Object)Long.toHexString(this.sessionId), (Object)len);
                }
                this.bbLen.clear();
                if (!this.initialized && this.checkFourLetterWord(this.channel, message, len)) {
                    return;
                }
                if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                    throw new IOException("Len error " + len);
                }
                this.zkServer.checkRequestSizeWhenReceivingMessage(len);
                this.bb = ByteBuffer.allocate(len);
            }
        }
        catch (IOException e) {
            LOG.warn("Closing connection to {}", (Object)this.getRemoteSocketAddress(), (Object)e);
            this.close(ServerCnxn.DisconnectReason.IO_EXCEPTION);
        }
        catch (ClientCnxnLimitException e) {
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1L);
            LOG.debug("Closing connection to {}", (Object)this.getRemoteSocketAddress(), (Object)e);
            this.close(ServerCnxn.DisconnectReason.CLIENT_RATE_LIMIT);
        }
    }

    @Override
    public void disableRecv(boolean waitDisableRecv) {
        if (this.throttled.compareAndSet(false, true)) {
            LOG.debug("Throttling - disabling recv {}", (Object)this);
            this.channel.pipeline().fireUserEventTriggered((Object)ReadEvent.DISABLE);
        }
    }

    @Override
    public void enableRecv() {
        if (this.throttled.compareAndSet(true, false)) {
            LOG.debug("Sending unthrottle event {}", (Object)this);
            this.channel.pipeline().fireUserEventTriggered((Object)ReadEvent.ENABLE);
        }
    }

    @Override
    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public int getInterestOps() {
        if (this.channel == null || !this.channel.isOpen()) {
            return 0;
        }
        int interestOps = 0;
        if (!this.throttled.get()) {
            interestOps |= 1;
        }
        if (!this.channel.isWritable()) {
            interestOps |= 4;
        }
        return interestOps;
    }

    @Override
    public InetSocketAddress getRemoteSocketAddress() {
        return (InetSocketAddress)this.channel.remoteAddress();
    }

    @Override
    public void sendCloseSession() {
        this.sendBuffer(ServerCnxnFactory.closeConn);
    }

    @Override
    protected ServerStats serverStats() {
        if (this.zkServer == null) {
            return null;
        }
        return this.zkServer.serverStats();
    }

    @Override
    public boolean isSecure() {
        return this.factory.secure;
    }

    @Override
    public Certificate[] getClientCertificateChain() {
        if (this.clientChain == null) {
            return null;
        }
        return Arrays.copyOf(this.clientChain, this.clientChain.length);
    }

    @Override
    public void setClientCertificateChain(Certificate[] chain) {
        this.clientChain = chain == null ? null : Arrays.copyOf(chain, chain.length);
    }

    Channel getChannel() {
        return this.channel;
    }

    public int getQueuedReadableBytes() {
        this.checkIsInEventLoop("getQueuedReadableBytes");
        if (this.queuedBuffer != null) {
            return this.queuedBuffer.readableBytes();
        }
        return 0;
    }

    public void setHandshakeState(HandshakeState state) {
        this.handshakeState = state;
    }

    public HandshakeState getHandshakeState() {
        return this.handshakeState;
    }

    static enum ReadEvent {
        DISABLE,
        ENABLE;

    }

    private class SendBufferWriter
    extends Writer {
        private StringBuffer sb = new StringBuffer();

        private SendBufferWriter() {
        }

        private void checkFlush(boolean force) {
            if (force && this.sb.length() > 0 || this.sb.length() > 2048) {
                NettyServerCnxn.this.sendBuffer(ByteBuffer.wrap(this.sb.toString().getBytes()));
                this.sb.setLength(0);
            }
        }

        @Override
        public void close() throws IOException {
            if (this.sb == null) {
                return;
            }
            this.checkFlush(true);
            this.sb = null;
        }

        @Override
        public void flush() throws IOException {
            this.checkFlush(true);
        }

        @Override
        public void write(char[] cbuf, int off, int len) throws IOException {
            this.sb.append(cbuf, off, len);
            this.checkFlush(false);
        }
    }

    public static enum HandshakeState {
        NONE,
        STARTED,
        FINISHED;

    }
}

