/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bk_v4_2_0.bookkeeper.conf.ClientConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookieProtocol;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_2_0.bookkeeper.util.MathUtils;
import org.apache.bk_v4_2_0.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bk_v4_2_0.bookkeeper.util.SafeRunnable;
import org.jboss.bk_v4_2_0.netty.bootstrap.ClientBootstrap;
import org.jboss.bk_v4_2_0.netty.buffer.ChannelBuffer;
import org.jboss.bk_v4_2_0.netty.buffer.ChannelBuffers;
import org.jboss.bk_v4_2_0.netty.channel.Channel;
import org.jboss.bk_v4_2_0.netty.channel.ChannelFuture;
import org.jboss.bk_v4_2_0.netty.channel.ChannelFutureListener;
import org.jboss.bk_v4_2_0.netty.channel.ChannelHandlerContext;
import org.jboss.bk_v4_2_0.netty.channel.ChannelPipeline;
import org.jboss.bk_v4_2_0.netty.channel.ChannelPipelineCoverage;
import org.jboss.bk_v4_2_0.netty.channel.ChannelPipelineFactory;
import org.jboss.bk_v4_2_0.netty.channel.ChannelStateEvent;
import org.jboss.bk_v4_2_0.netty.channel.Channels;
import org.jboss.bk_v4_2_0.netty.channel.ExceptionEvent;
import org.jboss.bk_v4_2_0.netty.channel.MessageEvent;
import org.jboss.bk_v4_2_0.netty.channel.SimpleChannelHandler;
import org.jboss.bk_v4_2_0.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.bk_v4_2_0.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.bk_v4_2_0.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.bk_v4_2_0.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.bk_v4_2_0.netty.handler.timeout.ReadTimeoutException;
import org.jboss.bk_v4_2_0.netty.handler.timeout.ReadTimeoutHandler;
import org.jboss.bk_v4_2_0.netty.util.HashedWheelTimer;
import org.jboss.bk_v4_2_0.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage(value="one")
public class PerChannelBookieClient
extends SimpleChannelHandler
implements ChannelPipelineFactory {
    static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
    static final long maxMemory = Runtime.getRuntime().maxMemory() / 5L;
    public static final int MAX_FRAME_LENGTH = 0x200000;
    InetSocketAddress addr;
    AtomicLong totalBytesOutstanding;
    ClientSocketChannelFactory channelFactory;
    OrderedSafeExecutor executor;
    private Timer readTimeoutTimer;
    ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap();
    ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap();
    Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<Void>>();
    volatile Channel channel = null;
    private volatile ConnectionState state;
    private final ClientConfiguration conf;

    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding);
    }

    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
        this.conf = conf;
        this.addr = addr;
        this.executor = executor;
        this.totalBytesOutstanding = totalBytesOutstanding;
        this.channelFactory = channelFactory;
        this.state = ConnectionState.DISCONNECTED;
        this.readTimeoutTimer = null;
    }

    private void connect() {
        LOG.info("Connecting to bookie: {}", (Object)this.addr);
        ClientBootstrap bootstrap = new ClientBootstrap(this.channelFactory);
        bootstrap.setPipelineFactory(this);
        bootstrap.setOption("tcpNoDelay", this.conf.getClientTcpNoDelay());
        bootstrap.setOption("keepAlive", true);
        ChannelFuture future = bootstrap.connect(this.addr);
        future.addListener(new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> oldPendingOps;
                int rc;
                PerChannelBookieClient perChannelBookieClient = PerChannelBookieClient.this;
                synchronized (perChannelBookieClient) {
                    if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTING) {
                        LOG.info("Successfully connected to bookie: " + PerChannelBookieClient.this.addr);
                        rc = 0;
                        PerChannelBookieClient.this.channel = future.getChannel();
                        PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                    } else if (future.isSuccess() && (PerChannelBookieClient.this.state == ConnectionState.CLOSED || PerChannelBookieClient.this.state == ConnectionState.DISCONNECTED)) {
                        LOG.error("Closed before connection completed, clean up: " + PerChannelBookieClient.this.addr);
                        future.getChannel().close();
                        rc = -8;
                        PerChannelBookieClient.this.channel = null;
                    } else {
                        LOG.error("Could not connect to bookie: " + PerChannelBookieClient.this.addr);
                        rc = -8;
                        PerChannelBookieClient.this.channel = null;
                        if (PerChannelBookieClient.this.state != ConnectionState.CLOSED) {
                            PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                        }
                    }
                    oldPendingOps = PerChannelBookieClient.this.pendingOps;
                    PerChannelBookieClient.this.pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<Void>>();
                }
                for (BookkeeperInternalCallbacks.GenericCallback genericCallback : oldPendingOps) {
                    genericCallback.operationComplete(rc, null);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<Void> op) {
        boolean completeOpNow = false;
        int opRc = 0;
        if (this.channel != null && this.state == ConnectionState.CONNECTED) {
            completeOpNow = true;
        } else {
            PerChannelBookieClient perChannelBookieClient = this;
            synchronized (perChannelBookieClient) {
                if (this.channel != null && this.state == ConnectionState.CONNECTED) {
                    completeOpNow = true;
                    opRc = 0;
                } else if (this.state == ConnectionState.CLOSED) {
                    completeOpNow = true;
                    opRc = -8;
                } else {
                    this.pendingOps.add(op);
                    if (this.state == ConnectionState.CONNECTING) {
                        return;
                    }
                    this.state = ConnectionState.CONNECTING;
                }
            }
            if (!completeOpNow) {
                this.connect();
            }
        }
        if (completeOpNow) {
            op.operationComplete(opRc, null);
        }
    }

    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, int options) {
        final int entrySize = toSend.readableBytes();
        final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
        this.addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
        int totalHeaderSize = 28;
        try {
            ChannelBuffer header = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
            header.writeInt(totalHeaderSize - 4 + entrySize);
            header.writeInt(new BookieProtocol.PacketHeader(2, 1, (short)options).toInt());
            header.writeBytes(masterKey, 0, 20);
            ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
            ChannelFuture future = this.channel.write(wrappedBuffer);
            future.addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress() + " entry length: " + entrySize);
                        }
                    } else {
                        PerChannelBookieClient.this.errorOutAddKey(completionKey);
                    }
                }
            });
        }
        catch (Throwable e) {
            LOG.warn("Add entry operation failed", e);
            this.errorOutAddKey(completionKey);
        }
    }

    public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey, final long entryId, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
        final CompletionKey key = new CompletionKey(ledgerId, entryId);
        this.readCompletions.put(key, new ReadCompletion(cb, ctx));
        int totalHeaderSize = 44;
        ChannelBuffer tmpEntry = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
        tmpEntry.writeInt(totalHeaderSize - 4);
        tmpEntry.writeInt(new BookieProtocol.PacketHeader(2, 2, 1).toInt());
        tmpEntry.writeLong(ledgerId);
        tmpEntry.writeLong(entryId);
        tmpEntry.writeBytes(masterKey, 0, 20);
        ChannelFuture future = this.channel.write(tmpEntry);
        future.addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                    }
                } else {
                    PerChannelBookieClient.this.errorOutReadKey(key);
                }
            }
        });
    }

    public void readEntry(final long ledgerId, final long entryId, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
        final CompletionKey key = new CompletionKey(ledgerId, entryId);
        this.readCompletions.put(key, new ReadCompletion(cb, ctx));
        int totalHeaderSize = 24;
        try {
            ChannelBuffer tmpEntry = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
            tmpEntry.writeInt(totalHeaderSize - 4);
            tmpEntry.writeInt(new BookieProtocol.PacketHeader(2, 2, 0).toInt());
            tmpEntry.writeLong(ledgerId);
            tmpEntry.writeLong(entryId);
            ChannelFuture future = this.channel.write(tmpEntry);
            future.addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                        }
                    } else {
                        PerChannelBookieClient.this.errorOutReadKey(key);
                    }
                }
            });
        }
        catch (Throwable e) {
            LOG.warn("Read entry operation failed", e);
            this.errorOutReadKey(key);
        }
    }

    public void disconnect() {
        this.closeInternal(false);
    }

    public void close() {
        this.closeInternal(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeInternal(boolean permanent) {
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (permanent) {
                this.state = ConnectionState.CLOSED;
            } else if (this.state != ConnectionState.CLOSED) {
                this.state = ConnectionState.DISCONNECTED;
            }
        }
        if (this.channel != null) {
            this.channel.close().awaitUninterruptibly();
        }
        if (this.readTimeoutTimer != null) {
            this.readTimeoutTimer.stop();
            this.readTimeoutTimer = null;
        }
    }

    void errorOutReadKey(final CompletionKey key) {
        this.executor.submitOrdered(key.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                ReadCompletion readCompletion = PerChannelBookieClient.this.readCompletions.remove(key);
                if (readCompletion != null) {
                    LOG.error("Could not write  request for reading entry: " + key.entryId + " ledger-id: " + key.ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                    readCompletion.cb.readEntryComplete(-8, key.ledgerId, key.entryId, null, readCompletion.ctx);
                }
            }
        });
    }

    void errorOutAddKey(final CompletionKey key) {
        this.executor.submitOrdered(key.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                AddCompletion addCompletion = PerChannelBookieClient.this.addCompletions.remove(key);
                if (addCompletion != null) {
                    String bAddress = "null";
                    if (PerChannelBookieClient.this.channel != null) {
                        bAddress = PerChannelBookieClient.this.channel.getRemoteAddress().toString();
                    }
                    LOG.error("Could not write request for adding entry: " + key.entryId + " ledger-id: " + key.ledgerId + " bookie: " + bAddress);
                    addCompletion.cb.writeComplete(-8, key.ledgerId, key.entryId, PerChannelBookieClient.this.addr, addCompletion.ctx);
                    LOG.error("Invoked callback method: " + key.entryId);
                }
            }
        });
    }

    void errorOutOutstandingEntries() {
        for (CompletionKey key : this.addCompletions.keySet()) {
            this.errorOutAddKey(key);
        }
        for (CompletionKey key : this.readCompletions.keySet()) {
            this.errorOutReadKey(key);
        }
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        if (this.readTimeoutTimer == null) {
            this.readTimeoutTimer = new HashedWheelTimer();
        }
        pipeline.addLast("readTimeout", new ReadTimeoutHandler(this.readTimeoutTimer, this.conf.getReadTimeout()));
        pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(0x200000, 0, 4, 0, 4));
        pipeline.addLast("mainhandler", this);
        return pipeline;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        LOG.info("Disconnected from bookie: " + this.addr);
        this.errorOutOutstandingEntries();
        Channel c = this.channel;
        if (c != null) {
            c.close();
        }
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (this.state != ConnectionState.CLOSED) {
                this.state = ConnectionState.DISCONNECTED;
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        Throwable t = e.getCause();
        if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) {
            LOG.error("Corrupted fram received from bookie: " + e.getChannel().getRemoteAddress());
            return;
        }
        if (t instanceof ReadTimeoutException) {
            for (CompletionKey key : this.addCompletions.keySet()) {
                if (!key.shouldTimeout()) continue;
                this.errorOutAddKey(key);
            }
            for (CompletionKey key : this.readCompletions.keySet()) {
                if (!key.shouldTimeout()) continue;
                this.errorOutReadKey(key);
            }
            return;
        }
        if (t instanceof IOException) {
            return;
        }
        LOG.error("Unexpected exception caught by bookie client channel handler", t);
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        long entryId;
        long ledgerId;
        int rc;
        BookieProtocol.PacketHeader header;
        if (!(e.getMessage() instanceof ChannelBuffer)) {
            ctx.sendUpstream(e);
            return;
        }
        final ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
        try {
            header = BookieProtocol.PacketHeader.fromInt(buffer.readInt());
            rc = buffer.readInt();
            ledgerId = buffer.readLong();
            entryId = buffer.readLong();
        }
        catch (IndexOutOfBoundsException ex) {
            LOG.error("Unparseable response from bookie: " + this.addr, (Throwable)ex);
            return;
        }
        this.executor.submitOrdered(ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                switch (header.getOpCode()) {
                    case 1: {
                        PerChannelBookieClient.this.handleAddResponse(ledgerId, entryId, rc);
                        break;
                    }
                    case 2: {
                        PerChannelBookieClient.this.handleReadResponse(ledgerId, entryId, rc, buffer);
                        break;
                    }
                    default: {
                        LOG.error("Unexpected response, type: " + header.getOpCode() + " received from bookie: " + PerChannelBookieClient.this.addr + " , ignoring");
                    }
                }
            }
        });
    }

    void handleAddResponse(long ledgerId, long entryId, int rc) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for add request from bookie: " + this.addr + " for ledger: " + ledgerId + " entry: " + entryId + " rc: " + rc);
        }
        switch (rc) {
            case 0: {
                rc = 0;
                break;
            }
            case 103: {
                rc = -16;
                break;
            }
            case 104: {
                rc = -101;
                break;
            }
            case 102: {
                rc = -102;
                break;
            }
            case 105: {
                rc = -104;
                break;
            }
            default: {
                LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + this.addr + " with code: " + rc);
                rc = -12;
            }
        }
        AddCompletion ac = this.addCompletions.remove(new CompletionKey(ledgerId, entryId));
        if (ac == null) {
            LOG.error("Unexpected add response received from bookie: " + this.addr + " for ledger: " + ledgerId + ", entry: " + entryId + " , ignoring");
            return;
        }
        ac.cb.writeComplete(rc, ledgerId, entryId, this.addr, ac.ctx);
    }

    void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for read request from bookie: " + this.addr + " for ledger: " + ledgerId + " entry: " + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
        }
        if (rc == 0) {
            rc = 0;
        } else if (rc == 2 || rc == 1) {
            rc = -13;
        } else if (rc == 103) {
            rc = -16;
        } else if (rc == 102) {
            rc = -102;
        } else {
            LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + this.addr + " with code: " + rc);
            rc = -1;
        }
        CompletionKey key = new CompletionKey(ledgerId, entryId);
        ReadCompletion readCompletion = this.readCompletions.remove(key);
        if (readCompletion == null) {
            readCompletion = this.readCompletions.remove(new CompletionKey(ledgerId, -1L));
        }
        if (readCompletion == null) {
            LOG.error("Unexpected read response received from bookie: " + this.addr + " for ledger: " + ledgerId + ", entry: " + entryId + " , ignoring");
            return;
        }
        readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
    }

    CompletionKey newCompletionKey(long ledgerId, long entryId) {
        return new CompletionKey(ledgerId, entryId);
    }

    class CompletionKey {
        long ledgerId;
        long entryId;
        final long timeoutAt;

        CompletionKey(long ledgerId, long entryId) {
            this.ledgerId = ledgerId;
            this.entryId = entryId;
            this.timeoutAt = MathUtils.now() + (long)(PerChannelBookieClient.this.conf.getReadTimeout() * 1000);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof CompletionKey) || obj == null) {
                return false;
            }
            CompletionKey that = (CompletionKey)obj;
            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
        }

        public int hashCode() {
            return (int)this.ledgerId << 16 ^ (int)this.entryId;
        }

        public String toString() {
            return String.format("LedgerEntry(%d, %d)", this.ledgerId, this.entryId);
        }

        public boolean shouldTimeout() {
            return this.timeoutAt <= MathUtils.now();
        }
    }

    static class AddCompletion {
        final BookkeeperInternalCallbacks.WriteCallback cb;
        final Object ctx;

        public AddCompletion(BookkeeperInternalCallbacks.WriteCallback cb, long size, Object ctx) {
            this.cb = cb;
            this.ctx = ctx;
        }
    }

    static class ReadCompletion {
        final BookkeeperInternalCallbacks.ReadEntryCallback cb;
        final Object ctx;

        public ReadCompletion(BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
            this.cb = cb;
            this.ctx = ctx;
        }
    }

    private static enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        CLOSED;

    }
}

