/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bk_v4_2_0.bookkeeper.AsyncCallback;
import org.apache.bk_v4_2_0.bookkeeper.CreateMode;
import org.apache.bk_v4_2_0.bookkeeper.KeeperException;
import org.apache.bk_v4_2_0.bookkeeper.WatchedEvent;
import org.apache.bk_v4_2_0.bookkeeper.Watcher;
import org.apache.bk_v4_2_0.bookkeeper.ZooDefs;
import org.apache.bk_v4_2_0.bookkeeper.client.BKException;
import org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper;
import org.apache.bk_v4_2_0.bookkeeper.conf.ClientConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.util.SafeRunnable;
import org.apache.bk_v4_2_0.bookkeeper.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BookieWatcher
implements Watcher,
AsyncCallback.ChildrenCallback {
    static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class);
    private final String bookieRegistrationPath;
    static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
    public static int ZK_CONNECT_BACKOFF_SEC = 1;
    final BookKeeper bk;
    HashSet<InetSocketAddress> knownBookies = new HashSet();
    final ScheduledExecutorService scheduler;
    SafeRunnable reReadTask = new SafeRunnable(){

        @Override
        public void safeRun() {
            BookieWatcher.this.readBookies();
        }
    };
    private ReadOnlyBookieWatcher readOnlyBookieWatcher;

    public BookieWatcher(ClientConfiguration conf, ScheduledExecutorService scheduler, BookKeeper bk) throws KeeperException, InterruptedException {
        this.bk = bk;
        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
        this.scheduler = scheduler;
        this.readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
    }

    public void readBookies() {
        this.readBookies(this);
    }

    public void readBookies(AsyncCallback.ChildrenCallback callback) {
        this.bk.getZkHandle().getChildren(this.bookieRegistrationPath, (Watcher)this, callback, null);
    }

    @Override
    public void process(WatchedEvent event) {
        this.readBookies();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        HashSet deadBookies;
        if (rc != KeeperException.Code.OK.intValue()) {
            this.scheduler.schedule(this.reReadTask, (long)ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS);
            return;
        }
        children.remove("readonly");
        HashSet<InetSocketAddress> newBookieAddrs = BookieWatcher.convertToBookieAddresses(children);
        BookieWatcher bookieWatcher = this;
        synchronized (bookieWatcher) {
            deadBookies = (HashSet)this.knownBookies.clone();
            deadBookies.removeAll(newBookieAddrs);
            deadBookies.removeAll(this.readOnlyBookieWatcher.getReadOnlyBookies());
            this.knownBookies = newBookieAddrs;
        }
        if (this.bk.getBookieClient() != null) {
            this.bk.getBookieClient().closeClients(deadBookies);
        }
    }

    private static HashSet<InetSocketAddress> convertToBookieAddresses(List<String> children) {
        HashSet<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
        for (String bookieAddrString : children) {
            InetSocketAddress bookieAddr;
            try {
                bookieAddr = StringUtils.parseAddr(bookieAddrString);
            }
            catch (IOException e) {
                logger.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie");
                continue;
            }
            newBookieAddrs.add(bookieAddr);
        }
        return newBookieAddrs;
    }

    public void readBookiesBlocking() throws InterruptedException, KeeperException {
        this.readOnlyBookieWatcher.readROBookiesBlocking();
        final LinkedBlockingQueue queue = new LinkedBlockingQueue();
        this.readBookies(new AsyncCallback.ChildrenCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                try {
                    BookieWatcher.this.processResult(rc, path, ctx, children);
                    queue.put(rc);
                }
                catch (InterruptedException e) {
                    logger.error("Interruped when trying to read bookies in a blocking fashion");
                    throw new RuntimeException(e);
                }
            }
        });
        int rc = (Integer)queue.take();
        if (rc != KeeperException.Code.OK.intValue()) {
            throw KeeperException.create(KeeperException.Code.get(rc));
        }
    }

    public ArrayList<InetSocketAddress> getNewBookies(int numBookiesNeeded) throws BKException.BKNotEnoughBookiesException {
        return this.getAdditionalBookies(EMPTY_SET, numBookiesNeeded);
    }

    public InetSocketAddress getAdditionalBookie(List<InetSocketAddress> existingBookies) throws BKException.BKNotEnoughBookiesException {
        return this.getAdditionalBookies(new HashSet<InetSocketAddress>(existingBookies), 1).get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ArrayList<InetSocketAddress> getAdditionalBookies(Set<InetSocketAddress> existingBookies, int numAdditionalBookiesNeeded) throws BKException.BKNotEnoughBookiesException {
        ArrayList<InetSocketAddress> allBookies;
        ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>();
        if (numAdditionalBookiesNeeded <= 0) {
            return newBookies;
        }
        BookieWatcher bookieWatcher = this;
        synchronized (bookieWatcher) {
            allBookies = new ArrayList<InetSocketAddress>(this.knownBookies);
        }
        Collections.shuffle(allBookies);
        for (InetSocketAddress bookie : allBookies) {
            if (existingBookies.contains(bookie)) continue;
            newBookies.add(bookie);
            if (--numAdditionalBookiesNeeded != 0) continue;
            return newBookies;
        }
        throw new BKException.BKNotEnoughBookiesException();
    }

    private static class ReadOnlyBookieWatcher
    implements Watcher,
    AsyncCallback.ChildrenCallback {
        private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyBookieWatcher.class);
        private HashSet<InetSocketAddress> readOnlyBookies = new HashSet();
        private BookKeeper bk;
        private String readOnlyBookieRegPath;

        public ReadOnlyBookieWatcher(ClientConfiguration conf, BookKeeper bk) throws KeeperException, InterruptedException {
            this.bk = bk;
            this.readOnlyBookieRegPath = conf.getZkAvailableBookiesPath() + "/" + "readonly";
            if (null == bk.getZkHandle().exists(this.readOnlyBookieRegPath, false)) {
                try {
                    bk.getZkHandle().create(this.readOnlyBookieRegPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {
                    // empty catch block
                }
            }
        }

        @Override
        public void process(WatchedEvent event) {
            this.readROBookies();
        }

        void readROBookiesBlocking() throws InterruptedException, KeeperException {
            final LinkedBlockingQueue queue = new LinkedBlockingQueue();
            this.readROBookies(new AsyncCallback.ChildrenCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx, List<String> children) {
                    try {
                        ReadOnlyBookieWatcher.this.processResult(rc, path, ctx, children);
                        queue.put(rc);
                    }
                    catch (InterruptedException e) {
                        logger.error("Interruped when trying to read readonly bookies in a blocking fashion");
                        throw new RuntimeException(e);
                    }
                }
            });
            int rc = (Integer)queue.take();
            if (rc != KeeperException.Code.OK.intValue()) {
                throw KeeperException.create(KeeperException.Code.get(rc));
            }
        }

        void readROBookies(AsyncCallback.ChildrenCallback callback) {
            this.bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, (Watcher)this, callback, null);
        }

        void readROBookies() {
            this.readROBookies(this);
        }

        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children) {
            HashSet newReadOnlyBookies;
            if (rc != KeeperException.Code.OK.intValue()) {
                LOG.error("Not able to read readonly bookies : ", (Throwable)KeeperException.create(KeeperException.Code.get(rc)));
                return;
            }
            this.readOnlyBookies = newReadOnlyBookies = BookieWatcher.convertToBookieAddresses(children);
        }

        public HashSet<InetSocketAddress> getReadOnlyBookies() {
            return this.readOnlyBookies;
        }
    }
}

