/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.meta;

import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bk_v4_2_0.bookkeeper.AsyncCallback;
import org.apache.bk_v4_2_0.bookkeeper.CreateMode;
import org.apache.bk_v4_2_0.bookkeeper.KeeperException;
import org.apache.bk_v4_2_0.bookkeeper.ZooDefs;
import org.apache.bk_v4_2_0.bookkeeper.ZooKeeper;
import org.apache.bk_v4_2_0.bookkeeper.client.LedgerMetadata;
import org.apache.bk_v4_2_0.bookkeeper.conf.AbstractConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.meta.LedgerManager;
import org.apache.bk_v4_2_0.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bk_v4_2_0.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bk_v4_2_0.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MSException;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetaStore;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreCallback;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreCursor;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreException;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreFactory;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreScannableTable;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreTable;
import org.apache.bk_v4_2_0.bookkeeper.metastore.MetastoreTableItem;
import org.apache.bk_v4_2_0.bookkeeper.metastore.Value;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_2_0.bookkeeper.replication.ReplicationException;
import org.apache.bk_v4_2_0.bookkeeper.util.StringUtils;
import org.apache.bk_v4_2_0.bookkeeper.util.ZkUtils;
import org.apache.bk_v4_2_0.bookkeeper.versioning.Version;
import org.apache.bk_v4_2_0.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MSLedgerManagerFactory
extends LedgerManagerFactory {
    static Logger LOG = LoggerFactory.getLogger(MSLedgerManagerFactory.class);
    public static final int CUR_VERSION = 1;
    public static final String TABLE_NAME = "LEDGER";
    public static final String META_FIELD = ".META";
    AbstractConfiguration conf;
    ZooKeeper zk;
    MetaStore metastore;

    @Override
    public int getCurrentVersion() {
        return 1;
    }

    @Override
    public LedgerManagerFactory initialize(AbstractConfiguration conf, ZooKeeper zk, int factoryVersion) throws IOException {
        if (1 != factoryVersion) {
            throw new IOException("Incompatible layout version found : " + factoryVersion);
        }
        this.conf = conf;
        this.zk = zk;
        String msName = conf.getMetastoreImplClass();
        try {
            this.metastore = MetastoreFactory.createMetaStore(msName);
            int msVersion = this.metastore.getVersion();
            this.metastore.init(conf, msVersion);
        }
        catch (Throwable t) {
            throw new IOException("Failed to initialize metastore " + msName + " : ", t);
        }
        return this;
    }

    @Override
    public void uninitialize() throws IOException {
        this.metastore.close();
    }

    static Long key2LedgerId(String key) {
        return null == key ? null : Long.valueOf(Long.parseLong(key, 10));
    }

    static String ledgerId2Key(Long lid) {
        return null == lid ? null : StringUtils.getZKStringId(lid);
    }

    static String rangeToString(Long firstLedger, boolean firstInclusive, Long lastLedger, boolean lastInclusive) {
        StringBuilder sb = new StringBuilder();
        sb.append(firstInclusive ? "[ " : "( ").append(firstLedger).append(" ~ ").append(lastLedger).append(lastInclusive ? " ]" : " )");
        return sb.toString();
    }

    static SortedSet<Long> entries2Ledgers(Iterator<MetastoreTableItem> entries) {
        TreeSet<Long> ledgers = new TreeSet<Long>();
        while (entries.hasNext()) {
            MetastoreTableItem item = entries.next();
            try {
                ledgers.add(MSLedgerManagerFactory.key2LedgerId(item.getKey()));
            }
            catch (NumberFormatException nfe) {
                LOG.warn("Found invalid ledger key {}", (Object)item.getKey());
            }
        }
        return ledgers;
    }

    @Override
    public LedgerManager newLedgerManager() {
        return new MsLedgerManager(this.conf, this.zk, this.metastore);
    }

    @Override
    public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        return new ZkLedgerUnderreplicationManager(this.conf, this.zk);
    }

    private static class AsyncSetProcessor<T> {
        ScheduledExecutorService scheduler;

        public AsyncSetProcessor(ScheduledExecutorService scheduler) {
            this.scheduler = scheduler;
        }

        public void process(Set<T> data, final BookkeeperInternalCallbacks.Processor<T> processor, final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) {
            if (data == null || data.size() == 0) {
                finalCb.processResult(successRc, null, context);
                return;
            }
            final Iterator<T> iter = data.iterator();
            AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx) {
                    if (rc != successRc) {
                        finalCb.processResult(failureRc, null, context);
                        return;
                    }
                    if (!iter.hasNext()) {
                        finalCb.processResult(successRc, null, context);
                        return;
                    }
                    final Object dataToProcess = iter.next();
                    final 1 stub = this;
                    AsyncSetProcessor.this.scheduler.submit(new Runnable(){

                        @Override
                        public final void run() {
                            processor.process(dataToProcess, stub);
                        }
                    });
                }
            };
            T firstElement = iter.next();
            processor.process(firstElement, stubCallback);
        }
    }

    static class MsLedgerManager
    implements LedgerManager {
        final ZooKeeper zk;
        final AbstractConfiguration conf;
        final MetaStore metastore;
        final MetastoreScannableTable ledgerTable;
        final int maxEntriesPerScan;
        static final String IDGEN_ZNODE = "ms-idgen";
        static final String IDGENERATION_PREFIX = "/ms-idgen/ID-";
        private final String idGenPath;
        ScheduledExecutorService scheduler;

        MsLedgerManager(AbstractConfiguration conf, ZooKeeper zk, MetaStore metastore) {
            this.conf = conf;
            this.zk = zk;
            this.metastore = metastore;
            try {
                this.ledgerTable = metastore.createScannableTable(MSLedgerManagerFactory.TABLE_NAME);
            }
            catch (MetastoreException mse) {
                LOG.error("Failed to instantiate table LEDGER in metastore " + metastore.getName());
                throw new RuntimeException("Failed to instantiate table LEDGER in metastore " + metastore.getName());
            }
            this.maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
            this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
            this.scheduler = Executors.newSingleThreadScheduledExecutor();
        }

        @Override
        public void close() {
            try {
                this.scheduler.shutdown();
            }
            catch (Exception e) {
                LOG.warn("Error when closing MsLedgerManager : ", (Throwable)e);
            }
            this.ledgerTable.close();
        }

        @Override
        public void createLedger(final LedgerMetadata metadata, final BookkeeperInternalCallbacks.GenericCallback<Long> ledgerCb) {
            ZkUtils.createFullPathOptimistic(this.zk, this.idGenPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback(){

                @Override
                public void processResult(int rc, String path, Object ctx, final String idPathName) {
                    long ledgerId;
                    if (rc != KeeperException.Code.OK.intValue()) {
                        LOG.error("Could not generate new ledger id", (Throwable)KeeperException.create(KeeperException.Code.get(rc), path));
                        ledgerCb.operationComplete(-9, null);
                        return;
                    }
                    try {
                        ledgerId = MsLedgerManager.this.getLedgerIdFromGenPath(idPathName);
                    }
                    catch (IOException e) {
                        LOG.error("Could not extract ledger-id from id gen path:" + path, (Throwable)e);
                        ledgerCb.operationComplete(-9, null);
                        return;
                    }
                    final long lid = ledgerId;
                    MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>(){

                        @Override
                        public void complete(int rc, Version version, Object ctx) {
                            if (MSException.Code.BadVersion.getCode() == rc) {
                                ledgerCb.operationComplete(-17, null);
                                return;
                            }
                            if (MSException.Code.OK.getCode() != rc) {
                                ledgerCb.operationComplete(-18, null);
                                return;
                            }
                            LOG.debug("Create ledger {} with version {} successfuly.", new Object[]{lid, version});
                            metadata.setVersion(version);
                            ledgerCb.operationComplete(0, lid);
                        }
                    };
                    MsLedgerManager.this.ledgerTable.put(MSLedgerManagerFactory.ledgerId2Key(lid), new Value().setField(MSLedgerManagerFactory.META_FIELD, metadata.serialize()), Version.NEW, msCallback, null);
                    MsLedgerManager.this.zk.delete(idPathName, -1, new AsyncCallback.VoidCallback(){

                        @Override
                        public void processResult(int rc, String path, Object ctx) {
                            if (rc != KeeperException.Code.OK.intValue()) {
                                LOG.warn("Exception during deleting znode for id generation : ", (Throwable)KeeperException.create(KeeperException.Code.get(rc), path));
                            } else {
                                LOG.debug("Deleting znode for id generation : {}", (Object)idPathName);
                            }
                        }
                    }, null);
                }
            }, null);
        }

        private long getLedgerIdFromGenPath(String nodeName) throws IOException {
            long ledgerId;
            try {
                String[] parts = nodeName.split(IDGENERATION_PREFIX);
                ledgerId = Long.parseLong(parts[parts.length - 1]);
            }
            catch (NumberFormatException e) {
                throw new IOException(e);
            }
            return ledgerId;
        }

        @Override
        public void removeLedgerMetadata(final long ledgerId, Version version, final BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
            MetastoreCallback<Void> msCallback = new MetastoreCallback<Void>(){

                @Override
                public void complete(int rc, Void value, Object ctx) {
                    int bkRc;
                    if (MSException.Code.NoKey.getCode() == rc) {
                        LOG.warn("Ledger entry does not exist in meta table: ledgerId={}", (Object)ledgerId);
                        bkRc = -7;
                    } else {
                        bkRc = MSException.Code.OK.getCode() == rc ? 0 : -18;
                    }
                    cb.operationComplete(bkRc, null);
                }
            };
            this.ledgerTable.remove(MSLedgerManagerFactory.ledgerId2Key(ledgerId), version, msCallback, null);
        }

        @Override
        public void readLedgerMetadata(final long ledgerId, final BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata> readCb) {
            final String key = MSLedgerManagerFactory.ledgerId2Key(ledgerId);
            MetastoreCallback<Versioned<Value>> msCallback = new MetastoreCallback<Versioned<Value>>(){

                @Override
                public void complete(int rc, Versioned<Value> value, Object ctx) {
                    LedgerMetadata metadata;
                    if (MSException.Code.NoKey.getCode() == rc) {
                        LOG.error("No ledger metadata found for ledger " + ledgerId + " : ", (Throwable)MSException.create(MSException.Code.get(rc), "No key " + key + " found."));
                        readCb.operationComplete(-7, null);
                        return;
                    }
                    if (MSException.Code.OK.getCode() != rc) {
                        LOG.error("Could not read metadata for ledger " + ledgerId + " : ", (Throwable)MSException.create(MSException.Code.get(rc), "Failed to get key " + key));
                        readCb.operationComplete(-18, null);
                        return;
                    }
                    try {
                        metadata = LedgerMetadata.parseConfig(value.getValue().getField(MSLedgerManagerFactory.META_FIELD), value.getVersion());
                    }
                    catch (IOException e) {
                        LOG.error("Could not parse ledger metadata for ledger " + ledgerId + " : ", (Throwable)e);
                        readCb.operationComplete(-18, null);
                        return;
                    }
                    readCb.operationComplete(0, metadata);
                }
            };
            this.ledgerTable.get(key, msCallback, MetastoreTable.ALL_FIELDS);
        }

        @Override
        public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata, final BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
            Value data = new Value().setField(MSLedgerManagerFactory.META_FIELD, metadata.serialize());
            LOG.debug("Writing ledger {} metadata, version {}", new Object[]{ledgerId, metadata.getVersion()});
            final String key = MSLedgerManagerFactory.ledgerId2Key(ledgerId);
            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>(){

                @Override
                public void complete(int rc, Version version, Object ctx) {
                    int bkRc;
                    if (MSException.Code.BadVersion.getCode() == rc) {
                        LOG.info("Bad version provided to updat metadata for ledger {}", (Object)ledgerId);
                        bkRc = -17;
                    } else if (MSException.Code.NoKey.getCode() == rc) {
                        LOG.warn("Ledger {} doesn't exist when writing its ledger metadata.", (Object)ledgerId);
                        bkRc = -7;
                    } else if (MSException.Code.OK.getCode() == rc) {
                        metadata.setVersion(version);
                        bkRc = 0;
                    } else {
                        LOG.warn("Conditional update ledger metadata failed: ", (Throwable)MSException.create(MSException.Code.get(rc), "Failed to put key " + key));
                        bkRc = -18;
                    }
                    cb.operationComplete(bkRc, null);
                }
            };
            this.ledgerTable.put(key, data, metadata.getVersion(), msCallback, null);
        }

        @Override
        public void asyncProcessLedgers(final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) {
            MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>(){

                @Override
                public void complete(int rc, MetastoreCursor cursor, Object ctx) {
                    if (MSException.Code.OK.getCode() != rc) {
                        finalCb.processResult(failureRc, null, context);
                        return;
                    }
                    if (!cursor.hasMoreEntries()) {
                        finalCb.processResult(successRc, null, context);
                        return;
                    }
                    MsLedgerManager.this.asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
                }
            };
            this.ledgerTable.openCursor(MetastoreTable.NON_FIELDS, openCursorCb, null);
        }

        void asyncProcessLedgers(final MetastoreCursor cursor, final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) {
            this.scheduler.submit(new Runnable(){

                @Override
                public void run() {
                    MsLedgerManager.this.doAsyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
                }
            });
        }

        void doAsyncProcessLedgers(final MetastoreCursor cursor, final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) {
            if (!cursor.hasMoreEntries()) {
                finalCb.processResult(successRc, null, context);
                return;
            }
            MetastoreCursor.ReadEntriesCallback msCallback = new MetastoreCursor.ReadEntriesCallback(){

                @Override
                public void complete(int rc, Iterator<MetastoreTableItem> entries, Object ctx) {
                    if (MSException.Code.OK.getCode() != rc) {
                        finalCb.processResult(failureRc, null, context);
                        return;
                    }
                    TreeSet<Long> ledgers = new TreeSet<Long>();
                    while (entries.hasNext()) {
                        MetastoreTableItem item = entries.next();
                        try {
                            ledgers.add(MSLedgerManagerFactory.key2LedgerId(item.getKey()));
                        }
                        catch (NumberFormatException nfe) {
                            LOG.warn("Found invalid ledger key {}", (Object)item.getKey());
                        }
                    }
                    if (0 == ledgers.size()) {
                        MsLedgerManager.this.asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
                        return;
                    }
                    final long startLedger = (Long)ledgers.first();
                    final long endLedger = (Long)ledgers.last();
                    AsyncSetProcessor<Long> setProcessor = new AsyncSetProcessor<Long>(MsLedgerManager.this.scheduler);
                    setProcessor.process(ledgers, processor, new AsyncCallback.VoidCallback(){

                        @Override
                        public void processResult(int rc, String path, Object ctx) {
                            if (successRc != rc) {
                                LOG.error("Failed when processing range " + MSLedgerManagerFactory.rangeToString(startLedger, true, endLedger, true));
                                finalCb.processResult(failureRc, null, context);
                                return;
                            }
                            MsLedgerManager.this.asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
                        }
                    }, context, successRc, failureRc);
                }
            };
            cursor.asyncReadEntries(this.maxEntriesPerScan, msCallback, null);
        }

        @Override
        public LedgerManager.LedgerRangeIterator getLedgerRanges() {
            return new MSLedgerRangeIterator();
        }

        class MSLedgerRangeIterator
        implements LedgerManager.LedgerRangeIterator {
            final CountDownLatch openCursorLatch = new CountDownLatch(1);
            MetastoreCursor cursor = null;

            MSLedgerRangeIterator() {
                MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>(){

                    @Override
                    public void complete(int rc, MetastoreCursor newCursor, Object ctx) {
                        if (MSException.Code.OK.getCode() != rc) {
                            LOG.error("Error opening cursor for ledger range iterator {}", (Object)rc);
                        } else {
                            MSLedgerRangeIterator.this.cursor = newCursor;
                        }
                        MSLedgerRangeIterator.this.openCursorLatch.countDown();
                    }
                };
                MsLedgerManager.this.ledgerTable.openCursor(MetastoreTable.NON_FIELDS, openCursorCb, null);
            }

            @Override
            public boolean hasNext() {
                try {
                    this.openCursorLatch.await();
                }
                catch (InterruptedException ie) {
                    LOG.error("Interrupted waiting for cursor to open", (Throwable)ie);
                    Thread.currentThread().interrupt();
                    return false;
                }
                if (this.cursor == null) {
                    return false;
                }
                return this.cursor.hasMoreEntries();
            }

            @Override
            public LedgerManager.LedgerRange next() throws IOException {
                try {
                    TreeSet<Long> ledgerIds = new TreeSet<Long>();
                    Iterator<MetastoreTableItem> iter = this.cursor.readEntries(MsLedgerManager.this.maxEntriesPerScan);
                    while (iter.hasNext()) {
                        ledgerIds.add(MSLedgerManagerFactory.key2LedgerId(iter.next().getKey()));
                    }
                    return new LedgerManager.LedgerRange(ledgerIds);
                }
                catch (MSException mse) {
                    LOG.error("Exception occurred reading from metastore", (Throwable)mse);
                    throw new IOException("Couldn't read from metastore", mse);
                }
            }
        }
    }

    static class SyncResult<T> {
        T value;
        int rc;
        boolean finished = false;

        SyncResult() {
        }

        public synchronized void complete(int rc, T value) {
            this.rc = rc;
            this.value = value;
            this.finished = true;
            this.notify();
        }

        public synchronized void block() {
            try {
                while (!this.finished) {
                    this.wait();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        public synchronized int getRetCode() {
            return this.rc;
        }

        public synchronized T getResult() {
            return this.value;
        }
    }
}

