/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bk_v4_2_0.bookkeeper.conf.ClientConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_2_0.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bk_v4_2_0.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bk_v4_2_0.bookkeeper.util.SafeRunnable;
import org.jboss.bk_v4_2_0.netty.buffer.ChannelBuffer;
import org.jboss.bk_v4_2_0.netty.buffer.ChannelBuffers;
import org.jboss.bk_v4_2_0.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.bk_v4_2_0.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieClient {
    static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
    AtomicLong totalBytesOutstanding = new AtomicLong();
    OrderedSafeExecutor executor;
    ClientSocketChannelFactory channelFactory;
    ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient> channels = new ConcurrentHashMap();
    private final ClientConfiguration conf;
    private volatile boolean closed;
    private ReentrantReadWriteLock closeLock;

    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
        this.conf = conf;
        this.channelFactory = channelFactory;
        this.executor = executor;
        this.closed = false;
        this.closeLock = new ReentrantReadWriteLock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PerChannelBookieClient lookupClient(InetSocketAddress addr) {
        PerChannelBookieClient channel = this.channels.get(addr);
        if (channel == null) {
            this.closeLock.readLock().lock();
            try {
                if (this.closed) {
                    PerChannelBookieClient perChannelBookieClient = null;
                    return perChannelBookieClient;
                }
                channel = new PerChannelBookieClient(this.conf, this.executor, this.channelFactory, addr, this.totalBytesOutstanding);
                PerChannelBookieClient prevChannel = this.channels.putIfAbsent(addr, channel);
                if (prevChannel != null) {
                    channel = prevChannel;
                }
            }
            finally {
                this.closeLock.readLock().unlock();
            }
        }
        return channel;
    }

    public void closeClients(Set<InetSocketAddress> addrs) {
        final HashSet<PerChannelBookieClient> clients = new HashSet<PerChannelBookieClient>();
        for (InetSocketAddress a : addrs) {
            PerChannelBookieClient c = this.channels.get(a);
            if (c == null) continue;
            clients.add(c);
        }
        if (clients.size() == 0) {
            return;
        }
        this.executor.submit(new SafeRunnable(){

            @Override
            public void safeRun() {
                for (PerChannelBookieClient c : clients) {
                    c.disconnect();
                }
            }
        });
    }

    public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId, final ChannelBuffer toSend, final BookkeeperInternalCallbacks.WriteCallback cb, final Object ctx, final int options) {
        final PerChannelBookieClient client = this.lookupClient(addr);
        if (client == null) {
            cb.writeComplete(-8, ledgerId, entryId, addr, ctx);
            return;
        }
        client.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            @Override
            public void operationComplete(final int rc, Void result) {
                if (rc != 0) {
                    BookieClient.this.executor.submitOrdered(ledgerId, new SafeRunnable(){

                        @Override
                        public void safeRun() {
                            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
                        }
                    });
                    return;
                }
                client.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
            }
        });
    }

    public void readEntryAndFenceLedger(InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId, final BookkeeperInternalCallbacks.ReadEntryCallback cb, final Object ctx) {
        final PerChannelBookieClient client = this.lookupClient(addr);
        if (client == null) {
            cb.readEntryComplete(-8, ledgerId, entryId, null, ctx);
            return;
        }
        client.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            @Override
            public void operationComplete(final int rc, Void result) {
                if (rc != 0) {
                    BookieClient.this.executor.submitOrdered(ledgerId, new SafeRunnable(){

                        @Override
                        public void safeRun() {
                            cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
                        }
                    });
                    return;
                }
                client.readEntryAndFenceLedger(ledgerId, masterKey, entryId, cb, ctx);
            }
        });
    }

    public void readEntry(InetSocketAddress addr, final long ledgerId, final long entryId, final BookkeeperInternalCallbacks.ReadEntryCallback cb, final Object ctx) {
        final PerChannelBookieClient client = this.lookupClient(addr);
        if (client == null) {
            cb.readEntryComplete(-8, ledgerId, entryId, null, ctx);
            return;
        }
        client.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            @Override
            public void operationComplete(final int rc, Void result) {
                if (rc != 0) {
                    BookieClient.this.executor.submitOrdered(ledgerId, new SafeRunnable(){

                        @Override
                        public void safeRun() {
                            cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
                        }
                    });
                    return;
                }
                client.readEntry(ledgerId, entryId, cb, ctx);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        this.closeLock.writeLock().lock();
        try {
            this.closed = true;
            for (PerChannelBookieClient channel : this.channels.values()) {
                channel.close();
            }
        }
        finally {
            this.closeLock.writeLock().unlock();
        }
    }

    public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
        if (args.length != 3) {
            System.err.println("USAGE: BookieClient bookieHost port ledger#");
            return;
        }
        BookkeeperInternalCallbacks.WriteCallback cb = new BookkeeperInternalCallbacks.WriteCallback(){

            @Override
            public void writeComplete(int rc, long ledger, long entry, InetSocketAddress addr, Object ctx) {
                Counter counter = (Counter)ctx;
                counter.dec();
                if (rc != 0) {
                    System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
                }
            }
        };
        Counter counter = new Counter();
        byte[] hello = "hello".getBytes();
        long ledger = Long.parseLong(args[2]);
        NioClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
        BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
        InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
        for (int i = 0; i < 100000; ++i) {
            counter.inc();
            bc.addEntry(addr, ledger, new byte[0], i, ChannelBuffers.wrappedBuffer(hello), cb, counter, 0);
        }
        counter.wait(0);
        System.out.println("Total = " + counter.total());
        channelFactory.releaseExternalResources();
        executor.shutdown();
    }

    private static class Counter {
        int i;
        int total;

        private Counter() {
        }

        synchronized void inc() {
            ++this.i;
            ++this.total;
        }

        synchronized void dec() {
            --this.i;
            this.notifyAll();
        }

        synchronized void wait(int limit) throws InterruptedException {
            while (this.i > limit) {
                this.wait();
            }
        }

        synchronized int total() {
            return this.total;
        }
    }
}

