/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.bookie;

import com.google.bk_v4_2_0.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bk_v4_2_0.bookkeeper.CreateMode;
import org.apache.bk_v4_2_0.bookkeeper.KeeperException;
import org.apache.bk_v4_2_0.bookkeeper.WatchedEvent;
import org.apache.bk_v4_2_0.bookkeeper.Watcher;
import org.apache.bk_v4_2_0.bookkeeper.ZooDefs;
import org.apache.bk_v4_2_0.bookkeeper.ZooKeeper;
import org.apache.bk_v4_2_0.bookkeeper.bookie.BookieBean;
import org.apache.bk_v4_2_0.bookkeeper.bookie.BookieException;
import org.apache.bk_v4_2_0.bookkeeper.bookie.Cookie;
import org.apache.bk_v4_2_0.bookkeeper.bookie.GarbageCollectorThread;
import org.apache.bk_v4_2_0.bookkeeper.bookie.HandleFactory;
import org.apache.bk_v4_2_0.bookkeeper.bookie.HandleFactoryImpl;
import org.apache.bk_v4_2_0.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bk_v4_2_0.bookkeeper.bookie.Journal;
import org.apache.bk_v4_2_0.bookkeeper.bookie.LedgerDescriptor;
import org.apache.bk_v4_2_0.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bk_v4_2_0.bookkeeper.bookie.LedgerStorage;
import org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bk_v4_2_0.bookkeeper.jmx.BKMBeanRegistry;
import org.apache.bk_v4_2_0.bookkeeper.meta.LedgerManager;
import org.apache.bk_v4_2_0.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_2_0.bookkeeper.util.IOUtils;
import org.apache.bk_v4_2_0.bookkeeper.util.MathUtils;
import org.apache.bk_v4_2_0.bookkeeper.util.StringUtils;
import org.apache.bk_v4_2_0.bookkeeper.util.ZkUtils;
import org.apache.bk_v4_2_0.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.bk_v4_2_0.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Bookie
extends Thread {
    static Logger LOG = LoggerFactory.getLogger(Bookie.class);
    final File journalDirectory;
    final ServerConfiguration conf;
    final SyncThread syncThread;
    final LedgerManagerFactory ledgerManagerFactory;
    final LedgerManager ledgerManager;
    final LedgerStorage ledgerStorage;
    final Journal journal;
    final HandleFactory handles;
    static final long METAENTRY_ID_LEDGER_KEY = -4096L;
    static final long METAENTRY_ID_FENCE_KEY = -8192L;
    private final String bookieRegistrationPath;
    private LedgerDirsManager ledgerDirsManager;
    ZooKeeper zk;
    private volatile boolean running = false;
    private volatile boolean shuttingdown = false;
    private int exitCode = 0;
    BookieBean jmxBookieBean;
    BKMBeanInfo jmxLedgerStorageBean;
    Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap());
    private final String zkBookieRegPath;
    private final AtomicBoolean readOnly = new AtomicBoolean(false);
    static final Future<Boolean> SUCCESS_FUTURE = new Future<Boolean>(){

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public Boolean get() {
            return true;
        }

        @Override
        public Boolean get(long timeout, TimeUnit unit) {
            return true;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }
    };

    public static void checkDirectoryStructure(File dir) throws IOException {
        if (!dir.exists()) {
            File parent = dir.getParentFile();
            File preV3versionFile = new File(dir.getParent(), "VERSION");
            final AtomicBoolean oldDataExists = new AtomicBoolean(false);
            parent.list(new FilenameFilter(){

                @Override
                public boolean accept(File dir, String name) {
                    if (name.endsWith(".txn") || name.endsWith(".idx") || name.endsWith(".log")) {
                        oldDataExists.set(true);
                    }
                    return true;
                }
            });
            if (preV3versionFile.exists() || oldDataExists.get()) {
                String err = "Directory layout version is less than 3, upgrade needed";
                LOG.error(err);
                throw new IOException(err);
            }
            if (!dir.mkdirs()) {
                String err = "Unable to create directory " + dir;
                LOG.error(err);
                throw new IOException(err);
            }
        }
    }

    private void checkEnvironment(ZooKeeper zk) throws BookieException, IOException {
        if (zk == null) {
            Bookie.checkDirectoryStructure(this.journalDirectory);
            for (File dir : this.ledgerDirsManager.getAllLedgerDirs()) {
                Bookie.checkDirectoryStructure(dir);
            }
            return;
        }
        try {
            String instanceId = this.getInstanceId(zk);
            boolean newEnv = false;
            Cookie masterCookie = Cookie.generateCookie(this.conf);
            if (null != instanceId) {
                masterCookie.setInstanceId(instanceId);
            }
            try {
                Cookie zkCookie = Cookie.readFromZooKeeper(zk, this.conf);
                masterCookie.verify(zkCookie);
            }
            catch (KeeperException.NoNodeException nne) {
                newEnv = true;
            }
            ArrayList<File> missedCookieDirs = new ArrayList<File>();
            Bookie.checkDirectoryStructure(this.journalDirectory);
            try {
                Cookie journalCookie = Cookie.readFromDirectory(this.journalDirectory);
                journalCookie.verify(masterCookie);
            }
            catch (FileNotFoundException fnf) {
                missedCookieDirs.add(this.journalDirectory);
            }
            for (File dir : this.ledgerDirsManager.getAllLedgerDirs()) {
                Bookie.checkDirectoryStructure(dir);
                try {
                    Cookie c = Cookie.readFromDirectory(dir);
                    c.verify(masterCookie);
                }
                catch (FileNotFoundException fnf) {
                    missedCookieDirs.add(dir);
                }
            }
            if (!newEnv && missedCookieDirs.size() > 0) {
                LOG.error("Cookie exists in zookeeper, but not in all local directories.  Directories missing cookie file are " + missedCookieDirs);
                throw new BookieException.InvalidCookieException();
            }
            if (newEnv) {
                if (missedCookieDirs.size() > 0) {
                    LOG.debug("Directories missing cookie file are {}", missedCookieDirs);
                    masterCookie.writeToDirectory(this.journalDirectory);
                    for (File dir : this.ledgerDirsManager.getAllLedgerDirs()) {
                        masterCookie.writeToDirectory(dir);
                    }
                }
                masterCookie.writeToZooKeeper(zk, this.conf);
            }
        }
        catch (KeeperException ke) {
            LOG.error("Couldn't access cookie in zookeeper", (Throwable)ke);
            throw new BookieException.InvalidCookieException(ke);
        }
        catch (UnknownHostException uhe) {
            LOG.error("Couldn't check cookies, networking is broken", (Throwable)uhe);
            throw new BookieException.InvalidCookieException(uhe);
        }
        catch (IOException ioe) {
            LOG.error("Error accessing cookie on disks", (Throwable)ioe);
            throw new BookieException.InvalidCookieException(ioe);
        }
        catch (InterruptedException ie) {
            LOG.error("Thread interrupted while checking cookies, exiting", (Throwable)ie);
            throw new BookieException.InvalidCookieException(ie);
        }
    }

    public static InetSocketAddress getBookieAddress(ServerConfiguration conf) throws UnknownHostException {
        return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), conf.getBookiePort());
    }

    private String getInstanceId(ZooKeeper zk) throws KeeperException, InterruptedException {
        String instanceId = null;
        try {
            byte[] data = zk.getData(this.conf.getZkLedgersRootPath() + "/" + "INSTANCEID", false, null);
            instanceId = new String(data);
        }
        catch (KeeperException.NoNodeException e) {
            LOG.warn("INSTANCEID not exists in zookeeper. Not considering it for data verification");
        }
        return instanceId;
    }

    public LedgerDirsManager getLedgerDirsManager() {
        return this.ledgerDirsManager;
    }

    public static File getCurrentDirectory(File dir) {
        return new File(dir, "current");
    }

    public static File[] getCurrentDirectories(File[] dirs) {
        File[] currentDirs = new File[dirs.length];
        for (int i = 0; i < dirs.length; ++i) {
            currentDirs[i] = Bookie.getCurrentDirectory(dirs[i]);
        }
        return currentDirs;
    }

    public Bookie(ServerConfiguration conf) throws IOException, KeeperException, InterruptedException, BookieException {
        super("Bookie-" + conf.getBookiePort());
        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
        this.conf = conf;
        this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
        this.ledgerDirsManager = new LedgerDirsManager(conf);
        this.zk = this.instantiateZookeeperClient(conf);
        this.checkEnvironment(this.zk);
        this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
        LOG.info("instantiate ledger manager {}", (Object)this.ledgerManagerFactory.getClass().getName());
        this.ledgerManager = this.ledgerManagerFactory.newLedgerManager();
        this.syncThread = new SyncThread(conf);
        this.ledgerStorage = new InterleavedLedgerStorage(conf, this.ledgerManager, this.ledgerDirsManager, new BookieSafeEntryAdder());
        this.handles = new HandleFactoryImpl(this.ledgerStorage);
        this.journal = new Journal(conf, this.ledgerDirsManager);
        this.zkBookieRegPath = this.bookieRegistrationPath + this.getMyId();
    }

    private String getMyId() throws UnknownHostException {
        return InetAddress.getLocalHost().getHostAddress() + ":" + this.conf.getBookiePort();
    }

    void readJournal() throws IOException, BookieException {
        this.journal.replay(new Journal.JournalScanner(){

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
                long ledgerId = recBuff.getLong();
                long entryId = recBuff.getLong();
                try {
                    LOG.debug("Replay journal - ledger id : {}, entry id : {}.", (Object)ledgerId, (Object)entryId);
                    if (entryId == -4096L) {
                        if (journalVersion < 3) throw new IOException("Invalid journal. Contains journalKey  but layout version (" + journalVersion + ") is too old to hold this");
                        int masterKeyLen = recBuff.getInt();
                        byte[] masterKey = new byte[masterKeyLen];
                        recBuff.get(masterKey);
                        Bookie.this.masterKeyCache.put(ledgerId, masterKey);
                        return;
                    } else if (entryId == -8192L) {
                        if (journalVersion < 4) throw new IOException("Invalid journal. Contains fenceKey  but layout version (" + journalVersion + ") is too old to hold this");
                        byte[] key = Bookie.this.masterKeyCache.get(ledgerId);
                        if (key == null) {
                            key = Bookie.this.ledgerStorage.readMasterKey(ledgerId);
                        }
                        LedgerDescriptor handle = Bookie.this.handles.getHandle(ledgerId, key);
                        handle.setFenced();
                        return;
                    } else {
                        byte[] key = Bookie.this.masterKeyCache.get(ledgerId);
                        if (key == null) {
                            key = Bookie.this.ledgerStorage.readMasterKey(ledgerId);
                        }
                        LedgerDescriptor handle = Bookie.this.handles.getHandle(ledgerId, key);
                        recBuff.rewind();
                        handle.addEntry(recBuff);
                    }
                    return;
                }
                catch (NoLedgerException nsle) {
                    LOG.debug("Skip replaying entries of ledger {} since it was deleted.", (Object)ledgerId);
                    return;
                }
                catch (BookieException be) {
                    throw new IOException(be);
                }
            }
        });
    }

    @Override
    public synchronized void start() {
        this.setDaemon(true);
        LOG.debug("I'm starting a bookie with journal directory {}", (Object)this.journalDirectory.getName());
        try {
            this.readJournal();
        }
        catch (IOException ioe) {
            LOG.error("Exception while replaying journals, shutting down", (Throwable)ioe);
            this.shutdown(5);
            return;
        }
        catch (BookieException be) {
            LOG.error("Exception while replaying journals, shutting down", (Throwable)be);
            this.shutdown(5);
            return;
        }
        super.start();
        this.ledgerDirsManager.addLedgerDirsListener(this.getLedgerDirsListener());
        this.ledgerDirsManager.start();
        this.ledgerStorage.start();
        this.syncThread.start();
        this.running = true;
        try {
            this.registerBookie(this.conf);
        }
        catch (IOException e) {
            LOG.error("Couldn't register bookie with zookeeper, shutting down", (Throwable)e);
            this.shutdown(4);
        }
    }

    private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() {
        return new LedgerDirsManager.LedgerDirsListener(){

            @Override
            public void diskFull(File disk) {
            }

            @Override
            public void diskFailed(File disk) {
                Bookie.this.triggerBookieShutdown(5);
            }

            @Override
            public void allDisksFull() {
                Bookie.this.transitionToReadOnlyMode();
            }

            @Override
            public void fatalError() {
                LOG.error("Fatal error reported by ledgerDirsManager");
                Bookie.this.triggerBookieShutdown(5);
            }
        };
    }

    public void registerJMX(BKMBeanInfo parent) {
        try {
            this.jmxBookieBean = new BookieBean(this);
            BKMBeanRegistry.getInstance().register(this.jmxBookieBean, parent);
            try {
                this.jmxLedgerStorageBean = this.ledgerStorage.getJMXBean();
                BKMBeanRegistry.getInstance().register(this.jmxLedgerStorageBean, this.jmxBookieBean);
            }
            catch (Exception e) {
                LOG.warn("Failed to register with JMX for ledger cache", (Throwable)e);
                this.jmxLedgerStorageBean = null;
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to register with JMX", (Throwable)e);
            this.jmxBookieBean = null;
        }
    }

    public void unregisterJMX() {
        try {
            if (this.jmxLedgerStorageBean != null) {
                BKMBeanRegistry.getInstance().unregister(this.jmxLedgerStorageBean);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", (Throwable)e);
        }
        try {
            if (this.jmxBookieBean != null) {
                BKMBeanRegistry.getInstance().unregister(this.jmxBookieBean);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", (Throwable)e);
        }
        this.jmxBookieBean = null;
        this.jmxLedgerStorageBean = null;
    }

    private ZooKeeper instantiateZookeeperClient(ServerConfiguration conf) throws IOException, InterruptedException, KeeperException {
        if (conf.getZkServers() == null) {
            LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
            return null;
        }
        return this.newZookeeper(conf.getZkServers(), conf.getZkTimeout());
    }

    protected void registerBookie(ServerConfiguration conf) throws IOException {
        if (null == this.zk) {
            return;
        }
        String zkBookieRegPath = this.bookieRegistrationPath + StringUtils.addrToString(Bookie.getBookieAddress(conf));
        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
        try {
            Watcher zkPrevRegNodewatcher = new Watcher(){

                @Override
                public void process(WatchedEvent event) {
                    if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
                        prevNodeLatch.countDown();
                    }
                }
            };
            if (null != this.zk.exists(zkBookieRegPath, zkPrevRegNodewatcher)) {
                LOG.info("Previous bookie registration znode: " + zkBookieRegPath + " exists, so waiting zk sessiontimeout: " + conf.getZkTimeout() + "ms for znode deletion");
                if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
                    throw new KeeperException.NodeExistsException(zkBookieRegPath);
                }
            }
            this.zk.create(zkBookieRegPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
        catch (KeeperException ke) {
            LOG.error("ZK exception registering ephemeral Znode for Bookie!", (Throwable)ke);
            throw new IOException(ke);
        }
        catch (InterruptedException ie) {
            LOG.error("ZK exception registering ephemeral Znode for Bookie!", (Throwable)ie);
            throw new IOException(ie);
        }
    }

    @VisibleForTesting
    public void transitionToReadOnlyMode() {
        if (!this.readOnly.compareAndSet(false, true)) {
            return;
        }
        if (!this.conf.isReadOnlyModeEnabled()) {
            LOG.warn("ReadOnly mode is not enabled. Can be enabled by configuring 'readOnlyModeEnabled=true' in configuration.Shutting down bookie");
            this.triggerBookieShutdown(5);
            return;
        }
        LOG.info("Transitioning Bookie to ReadOnly mode, and will serve only read requests from clients!");
        try {
            if (null == this.zk.exists(this.bookieRegistrationPath + "readonly", false)) {
                try {
                    this.zk.create(this.bookieRegistrationPath + "readonly", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException e) {
                    // empty catch block
                }
            }
            this.zk.create(this.bookieRegistrationPath + "readonly" + "/" + this.getMyId(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            this.zk.delete(this.zkBookieRegPath, -1);
        }
        catch (IOException e) {
            LOG.error("Error in transition to ReadOnly Mode. Shutting down", (Throwable)e);
            this.triggerBookieShutdown(5);
            return;
        }
        catch (KeeperException e) {
            LOG.error("Error in transition to ReadOnly Mode. Shutting down", (Throwable)e);
            this.triggerBookieShutdown(5);
            return;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("Interrupted Exception while transitioning to ReadOnly Mode.");
            return;
        }
    }

    public boolean isReadOnly() {
        return this.readOnly.get();
    }

    private ZooKeeper newZookeeper(String zkServers, int sessionTimeout) throws IOException, InterruptedException, KeeperException {
        ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(this.conf.getZkTimeout()){

            @Override
            public void process(WatchedEvent event) {
                if (event.getState().equals((Object)Watcher.Event.KeeperState.Expired)) {
                    LOG.error("ZK client connection to the ZK server has expired!");
                    Bookie.this.shutdown(3);
                } else {
                    super.process(event);
                }
            }
        };
        return ZkUtils.createConnectedZookeeperClient(zkServers, w);
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override
    public void run() {
        try {
            this.journal.start();
            this.journal.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (!this.shuttingdown) {
            LOG.error("Journal manager quits unexpectedly.");
            this.triggerBookieShutdown(5);
        }
    }

    void triggerBookieShutdown(final int exitCode) {
        Thread shutdownThread = new Thread(){

            @Override
            public void run() {
                Bookie.this.shutdown(exitCode);
            }
        };
        shutdownThread.start();
        try {
            shutdownThread.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.debug("InterruptedException while waiting for shutdown. Not a problem!!");
        }
    }

    public int shutdown() {
        return this.shutdown(0);
    }

    synchronized int shutdown(int exitCode) {
        block5: {
            try {
                if (!this.running) break block5;
                this.exitCode = exitCode;
                this.shuttingdown = true;
                if (this.zk != null) {
                    this.zk.close();
                }
                this.ledgerDirsManager.shutdown();
                this.journal.shutdown();
                this.join();
                this.syncThread.shutdown();
                this.ledgerStorage.shutdown();
                try {
                    this.ledgerManager.close();
                    this.ledgerManagerFactory.uninitialize();
                }
                catch (IOException ie) {
                    LOG.error("Failed to close active ledger manager : ", (Throwable)ie);
                }
                this.running = false;
            }
            catch (InterruptedException ie) {
                LOG.error("Interrupted during shutting down bookie : ", (Throwable)ie);
            }
        }
        return this.exitCode;
    }

    private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey) throws IOException, BookieException {
        long ledgerId = entry.getLong();
        LedgerDescriptor l = this.handles.getHandle(ledgerId, masterKey);
        if (!this.masterKeyCache.containsKey(ledgerId)) {
            ByteBuffer bb = ByteBuffer.allocate(20 + masterKey.length);
            bb.putLong(ledgerId);
            bb.putLong(-4096L);
            bb.putInt(masterKey.length);
            bb.put(masterKey);
            bb.flip();
            this.journal.logAddEntry(bb, new NopWriteCallback(), null);
            this.masterKeyCache.put(ledgerId, masterKey);
        }
        return l;
    }

    protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry) throws IOException, BookieException {
        byte[] key = this.ledgerStorage.readMasterKey(ledgerId);
        LedgerDescriptor handle = this.handles.getHandle(ledgerId, key);
        handle.addEntry(entry);
    }

    private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx) throws IOException, BookieException {
        long ledgerId = handle.getLedgerId();
        entry.rewind();
        long entryId = handle.addEntry(entry);
        entry.rewind();
        LOG.trace("Adding {}@{}", (Object)entryId, (Object)ledgerId);
        this.journal.logAddEntry(entry, cb, ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recoveryAddEntry(ByteBuffer entry, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException {
        try {
            LedgerDescriptor handle;
            LedgerDescriptor ledgerDescriptor = handle = this.getLedgerForEntry(entry, masterKey);
            synchronized (ledgerDescriptor) {
                this.addEntryInternal(handle, entry, cb, ctx);
            }
        }
        catch (LedgerDirsManager.NoWritableLedgerDirException e) {
            this.transitionToReadOnlyMode();
            throw new IOException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addEntry(ByteBuffer entry, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException {
        try {
            LedgerDescriptor handle;
            LedgerDescriptor ledgerDescriptor = handle = this.getLedgerForEntry(entry, masterKey);
            synchronized (ledgerDescriptor) {
                if (handle.isFenced()) {
                    throw BookieException.create(-101);
                }
                this.addEntryInternal(handle, entry, cb, ctx);
            }
        }
        catch (LedgerDirsManager.NoWritableLedgerDirException e) {
            this.transitionToReadOnlyMode();
            throw new IOException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Boolean> fenceLedger(long ledgerId, byte[] masterKey) throws IOException, BookieException {
        boolean success;
        LedgerDescriptor handle;
        LedgerDescriptor ledgerDescriptor = handle = this.handles.getHandle(ledgerId, masterKey);
        synchronized (ledgerDescriptor) {
            success = handle.setFenced();
        }
        if (success) {
            ByteBuffer bb = ByteBuffer.allocate(16);
            bb.putLong(ledgerId);
            bb.putLong(-8192L);
            bb.flip();
            FutureWriteCallback fwc = new FutureWriteCallback();
            LOG.debug("record fenced state for ledger {} in journal.", (Object)ledgerId);
            this.journal.logAddEntry(bb, fwc, null);
            return fwc.getResult();
        }
        return SUCCESS_FUTURE;
    }

    public ByteBuffer readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException {
        LedgerDescriptor handle = this.handles.getReadOnlyHandle(ledgerId);
        LOG.trace("Reading {}@{}", (Object)entryId, (Object)ledgerId);
        return handle.readEntry(entryId);
    }

    public static boolean format(ServerConfiguration conf, boolean isInteractive, boolean force) {
        File[] ledgerDirs;
        File journalDir = conf.getJournalDir();
        if (journalDir.exists() && journalDir.isDirectory() && journalDir.list().length != 0) {
            try {
                boolean confirm = false;
                confirm = !isInteractive ? force : IOUtils.confirmPrompt("Are you sure to format Bookie data..?");
                if (!confirm) {
                    LOG.error("Bookie format aborted!!");
                    return false;
                }
            }
            catch (IOException e) {
                LOG.error("Error during bookie format", (Throwable)e);
                return false;
            }
        }
        if (!Bookie.cleanDir(journalDir)) {
            LOG.error("Formatting journal directory failed");
            return false;
        }
        for (File dir : ledgerDirs = conf.getLedgerDirs()) {
            if (Bookie.cleanDir(dir)) continue;
            LOG.error("Formatting ledger directory " + dir + " failed");
            return false;
        }
        LOG.info("Bookie format completed successfully");
        return true;
    }

    private static boolean cleanDir(File dir) {
        if (dir.exists()) {
            for (File child : dir.listFiles()) {
                boolean delete = FileUtils.deleteQuietly(child);
                if (delete) continue;
                LOG.error("Not able to delete " + child);
                return false;
            }
        } else if (!dir.mkdirs()) {
            LOG.error("Not able to create the directory " + dir);
            return false;
        }
        return true;
    }

    public static void main(String[] args) throws IOException, InterruptedException, BookieException, KeeperException {
        Bookie b = new Bookie(new ServerConfiguration());
        b.start();
        CounterCallback cb = new CounterCallback();
        long start = MathUtils.now();
        for (int i = 0; i < 100000; ++i) {
            ByteBuffer buff = ByteBuffer.allocate(1024);
            buff.putLong(1L);
            buff.putLong(i);
            buff.limit(1024);
            buff.position(0);
            cb.incCount();
            b.addEntry(buff, cb, null, new byte[0]);
        }
        cb.waitZero();
        long end = MathUtils.now();
        System.out.println("Took " + (end - start) + "ms");
    }

    private class BookieSafeEntryAdder
    implements GarbageCollectorThread.SafeEntryAdder {
        private BookieSafeEntryAdder() {
        }

        @Override
        public void safeAddEntry(final long ledgerId, final ByteBuffer buffer, final BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
            Bookie.this.journal.logAddEntry(buffer, new BookkeeperInternalCallbacks.WriteCallback(){

                @Override
                public void writeComplete(int rc, long ledgerId2, long entryId, InetSocketAddress addr, Object ctx) {
                    if (rc != 0) {
                        LOG.error("Error rewriting to journal (ledger {}, entry {})", (Object)ledgerId2, (Object)entryId);
                        cb.operationComplete(rc, null);
                        return;
                    }
                    try {
                        Bookie.this.addEntryByLedgerId(ledgerId, buffer);
                        cb.operationComplete(rc, null);
                    }
                    catch (IOException ioe) {
                        LOG.error("Error adding to ledger storage (ledger " + ledgerId2 + ", entry " + entryId + ")", (Throwable)ioe);
                        cb.operationComplete(-100, null);
                    }
                    catch (BookieException bke) {
                        LOG.error("Bookie error adding to ledger storage (ledger " + ledgerId2 + ", entry " + entryId + ")", (Throwable)bke);
                        cb.operationComplete(bke.getCode(), null);
                    }
                }
            }, null);
        }
    }

    static class CounterCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        int count;

        CounterCallback() {
        }

        @Override
        public synchronized void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
            --this.count;
            if (this.count == 0) {
                this.notifyAll();
            }
        }

        public synchronized void incCount() {
            ++this.count;
        }

        public synchronized void waitZero() throws InterruptedException {
            while (this.count > 0) {
                this.wait();
            }
        }
    }

    class SyncThread
    extends Thread {
        volatile boolean running;
        final AtomicBoolean flushing;
        final int flushInterval;
        private Object suspensionLock;
        private boolean suspended;

        public SyncThread(ServerConfiguration conf) {
            super("SyncThread");
            this.running = true;
            this.flushing = new AtomicBoolean(false);
            this.suspensionLock = new Object();
            this.suspended = false;
            this.flushInterval = conf.getFlushInterval();
            LOG.debug("Flush Interval : {}", (Object)this.flushInterval);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @VisibleForTesting
        public void suspendSync() {
            Object object = this.suspensionLock;
            synchronized (object) {
                this.suspended = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @VisibleForTesting
        public void resumeSync() {
            Object object = this.suspensionLock;
            synchronized (object) {
                this.suspended = false;
                this.suspensionLock.notify();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (this.running) {
                    Object object = this;
                    synchronized (object) {
                        block19: {
                            try {
                                this.wait(this.flushInterval);
                                if (!Bookie.this.ledgerStorage.isFlushRequired()) {
                                }
                                break block19;
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            continue;
                        }
                    }
                    object = this.suspensionLock;
                    synchronized (object) {
                        while (this.suspended) {
                            this.suspensionLock.wait();
                        }
                    }
                    if (!this.flushing.compareAndSet(false, true)) break;
                    Bookie.this.journal.markLog();
                    boolean flushFailed = false;
                    try {
                        Bookie.this.ledgerStorage.flush();
                    }
                    catch (LedgerDirsManager.NoWritableLedgerDirException e) {
                        flushFailed = true;
                        this.flushing.set(false);
                        Bookie.this.transitionToReadOnlyMode();
                    }
                    catch (IOException e) {
                        LOG.error("Exception flushing Ledger", (Throwable)e);
                        flushFailed = true;
                    }
                    if (!flushFailed) {
                        try {
                            Bookie.this.journal.rollLog();
                            Bookie.this.journal.gcJournals();
                        }
                        catch (LedgerDirsManager.NoWritableLedgerDirException e) {
                            this.flushing.set(false);
                            Bookie.this.transitionToReadOnlyMode();
                        }
                    }
                    this.flushing.set(false);
                }
            }
            catch (Throwable t) {
                LOG.error("Exception in SyncThread", t);
                Bookie.this.triggerBookieShutdown(5);
            }
        }

        void shutdown() throws InterruptedException {
            this.running = false;
            if (this.flushing.compareAndSet(false, true)) {
                this.interrupt();
            }
            this.join();
        }
    }

    static class FutureWriteCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        CountDownLatchFuture<Boolean> result = new CountDownLatchFuture();

        FutureWriteCallback() {
        }

        @Override
        public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finished writing entry {} @ ledger {} for {} : {}", new Object[]{entryId, ledgerId, addr, rc});
            }
            this.result.setDone(0 == rc);
        }

        public Future<Boolean> getResult() {
            return this.result;
        }
    }

    static class CountDownLatchFuture<T>
    implements Future<T> {
        T value = null;
        volatile boolean done = false;
        CountDownLatch latch = new CountDownLatch(1);

        CountDownLatchFuture() {
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public T get() throws InterruptedException {
            this.latch.await();
            return this.value;
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException {
            this.latch.await(timeout, unit);
            return this.value;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return this.done;
        }

        void setDone(T value) {
            this.value = value;
            this.done = true;
            this.latch.countDown();
        }
    }

    static class NopWriteCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        NopWriteCallback() {
        }

        @Override
        public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finished writing entry {} @ ledger {} for {} : {}", new Object[]{entryId, ledgerId, addr, rc});
            }
        }
    }

    public static class NoEntryException
    extends IOException {
        private static final long serialVersionUID = 1L;
        private long ledgerId;
        private long entryId;

        public NoEntryException(long ledgerId, long entryId) {
            this("Entry " + entryId + " not found in " + ledgerId, ledgerId, entryId);
        }

        public NoEntryException(String msg, long ledgerId, long entryId) {
            super(msg);
            this.ledgerId = ledgerId;
            this.entryId = entryId;
        }

        public long getLedger() {
            return this.ledgerId;
        }

        public long getEntry() {
            return this.entryId;
        }
    }

    public static class NoLedgerException
    extends IOException {
        private static final long serialVersionUID = 1L;
        private long ledgerId;

        public NoLedgerException(long ledgerId) {
            super("Ledger " + ledgerId + " not found");
            this.ledgerId = ledgerId;
        }

        public long getLedgerId() {
            return this.ledgerId;
        }
    }
}

