/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.meta;

import bk-shade.com.google.proto_bk_v4_2_0.Message;
import bk-shade.com.google.proto_bk_v4_2_0.TextFormat;
import com.google.bk_v4_2_0.common.base.Charsets;
import com.google.bk_v4_2_0.common.base.Joiner;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.bk_v4_2_0.bookkeeper.CreateMode;
import org.apache.bk_v4_2_0.bookkeeper.KeeperException;
import org.apache.bk_v4_2_0.bookkeeper.WatchedEvent;
import org.apache.bk_v4_2_0.bookkeeper.Watcher;
import org.apache.bk_v4_2_0.bookkeeper.ZooDefs;
import org.apache.bk_v4_2_0.bookkeeper.ZooKeeper;
import org.apache.bk_v4_2_0.bookkeeper.conf.AbstractConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.data.Stat;
import org.apache.bk_v4_2_0.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_2_0.bookkeeper.proto.DataFormats;
import org.apache.bk_v4_2_0.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bk_v4_2_0.bookkeeper.replication.ReplicationException;
import org.apache.bk_v4_2_0.bookkeeper.util.ZkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkLedgerUnderreplicationManager
implements LedgerUnderreplicationManager {
    static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
    static final String LAYOUT = "BASIC";
    static final int LAYOUT_VERSION = 1;
    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, Lock>();
    private final Pattern idExtractionPattern;
    private final String basePath;
    private final String urLedgerPath;
    private final String urLockPath;
    private final String layoutZNode;
    private final DataFormats.LockDataFormat lockData;
    private final ZooKeeper zkc;

    public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc) throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        this.basePath = conf.getZkLedgersRootPath() + '/' + "underreplication";
        this.layoutZNode = this.basePath + '/' + "LAYOUT";
        this.urLedgerPath = this.basePath + "/ledgers";
        this.urLockPath = this.basePath + "/locks";
        this.idExtractionPattern = Pattern.compile("urL(\\d+)$");
        this.zkc = zkc;
        DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder();
        try {
            lockDataBuilder.setBookieId(InetAddress.getLocalHost().getHostAddress().toString());
        }
        catch (UnknownHostException uhe) {
            // empty catch block
        }
        this.lockData = lockDataBuilder.build();
        this.checkLayout();
    }

    private void checkLayout() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        if (this.zkc.exists(this.basePath, false) == null) {
            try {
                this.zkc.create(this.basePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nee) {
                // empty catch block
            }
        }
        while (this.zkc.exists(this.layoutZNode, false) == null) {
            DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
            builder.setType(LAYOUT).setVersion(1);
            try {
                this.zkc.create(this.layoutZNode, TextFormat.printToString(builder.build()).getBytes(Charsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nne) {}
        }
        byte[] layoutData = this.zkc.getData(this.layoutZNode, false, null);
        DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
        try {
            TextFormat.merge(new String(layoutData, Charsets.UTF_8), (Message.Builder)builder);
            DataFormats.LedgerRereplicationLayoutFormat layout = builder.build();
            if (!layout.getType().equals(LAYOUT) || layout.getVersion() != 1) {
                throw new ReplicationException.CompatibilityException("Incompatible layout found (BASIC:1)");
            }
        }
        catch (TextFormat.ParseException pe) {
            throw new ReplicationException.CompatibilityException("Invalid data found", pe);
        }
        if (this.zkc.exists(this.urLedgerPath, false) == null) {
            try {
                this.zkc.create(this.urLedgerPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nee) {
                // empty catch block
            }
        }
        if (this.zkc.exists(this.urLockPath, false) == null) {
            try {
                this.zkc.create(this.urLockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
    }

    private long getLedgerId(String path) throws NumberFormatException {
        Matcher m = this.idExtractionPattern.matcher(path);
        if (m.find()) {
            return Long.valueOf(m.group(1));
        }
        throw new NumberFormatException("Couldn't find ledgerid in path");
    }

    public static String getParentZnodePath(String base, long ledgerId) {
        String subdir1 = String.format("%04x", ledgerId >> 48 & 0xFFFFL);
        String subdir2 = String.format("%04x", ledgerId >> 32 & 0xFFFFL);
        String subdir3 = String.format("%04x", ledgerId >> 16 & 0xFFFFL);
        String subdir4 = String.format("%04x", ledgerId & 0xFFFFL);
        return String.format("%s/%s/%s/%s/%s", base, subdir1, subdir2, subdir3, subdir4);
    }

    public static String getUrLedgerZnode(String base, long ledgerId) {
        return String.format("%s/urL%010d", ZkLedgerUnderreplicationManager.getParentZnodePath(base, ledgerId), ledgerId);
    }

    private String getUrLedgerZnode(long ledgerId) {
        return ZkLedgerUnderreplicationManager.getUrLedgerZnode(this.urLedgerPath, ledgerId);
    }

    @Override
    public void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException.UnavailableException {
        LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", (Object)ledgerId, (Object)missingReplica);
        try {
            String znode = this.getUrLedgerZnode(ledgerId);
            while (true) {
                DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
                try {
                    builder.addReplica(missingReplica);
                    ZkUtils.createFullPathOptimistic(this.zkc, znode, TextFormat.printToString(builder.build()).getBytes(Charsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nee) {
                    Stat s = this.zkc.exists(znode, false);
                    if (s == null) continue;
                    try {
                        byte[] bytes = this.zkc.getData(znode, false, s);
                        builder.clear();
                        TextFormat.merge(new String(bytes, Charsets.UTF_8), (Message.Builder)builder);
                        DataFormats.UnderreplicatedLedgerFormat data = builder.build();
                        if (data.getReplicaList().contains(missingReplica)) {
                            return;
                        }
                        builder.addReplica(missingReplica);
                        this.zkc.setData(znode, TextFormat.printToString(builder.build()).getBytes(Charsets.UTF_8), s.getVersion());
                    }
                    catch (KeeperException.NoNodeException nne) {
                        continue;
                    }
                    catch (KeeperException.BadVersionException bve) {
                        continue;
                    }
                    catch (TextFormat.ParseException pe) {
                        throw new ReplicationException.UnavailableException("Invalid data found", pe);
                    }
                }
                break;
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void markLedgerReplicated(long ledgerId) throws ReplicationException.UnavailableException {
        block13: {
            LOG.debug("markLedgerReplicated(ledgerId={})", (Object)ledgerId);
            try {
                Lock l = this.heldLocks.get(ledgerId);
                if (l == null) break block13;
                this.zkc.delete(this.getUrLedgerZnode(ledgerId), l.getLedgerZNodeVersion());
                try {
                    String[] parts = this.getUrLedgerZnode(ledgerId).split("/");
                    for (int i = 1; i <= 4; ++i) {
                        Object[] p = Arrays.copyOf(parts, parts.length - i);
                        String path = Joiner.on("/").join(p);
                        Stat s = this.zkc.exists(path, null);
                        if (s == null) continue;
                        this.zkc.delete(path, s.getVersion());
                    }
                }
                catch (KeeperException.NotEmptyException nee) {
                    // empty catch block
                }
            }
            catch (KeeperException.NoNodeException nne) {
            }
            catch (KeeperException.BadVersionException bve) {
            }
            catch (KeeperException ke) {
                LOG.error("Error deleting underreplicated ledger znode", (Throwable)ke);
                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
            }
            finally {
                this.releaseUnderreplicatedLedger(ledgerId);
            }
        }
    }

    private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w) throws KeeperException, InterruptedException {
        List<String> children;
        if (depth == 4L) {
            List<String> children2;
            try {
                children2 = this.zkc.getChildren(parent, w);
            }
            catch (KeeperException.NoNodeException nne) {
                return -1L;
            }
            Collections.shuffle(children2);
            while (children2.size() > 0) {
                String tryChild = children2.get(0);
                try {
                    String lockPath = this.urLockPath + "/" + tryChild;
                    if (this.zkc.exists(lockPath, w) != null) {
                        children2.remove(tryChild);
                        continue;
                    }
                    Stat stat = this.zkc.exists(parent + "/" + tryChild, false);
                    if (stat == null) {
                        LOG.debug("{}/{} doesn't exist", (Object)parent, (Object)tryChild);
                        children2.remove(tryChild);
                        continue;
                    }
                    long ledgerId = this.getLedgerId(tryChild);
                    this.zkc.create(lockPath, TextFormat.printToString(this.lockData).getBytes(Charsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    this.heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
                    return ledgerId;
                }
                catch (KeeperException.NodeExistsException nee) {
                    children2.remove(tryChild);
                }
                catch (NumberFormatException nfe) {
                    children2.remove(tryChild);
                }
            }
            return -1L;
        }
        try {
            children = this.zkc.getChildren(parent, w);
        }
        catch (KeeperException.NoNodeException nne) {
            return -1L;
        }
        Collections.shuffle(children);
        while (children.size() > 0) {
            String tryChild = children.get(0);
            String tryPath = parent + "/" + tryChild;
            long ledger = this.getLedgerToRereplicateFromHierarchy(tryPath, depth + 1L, w);
            if (ledger != -1L) {
                return ledger;
            }
            children.remove(tryChild);
        }
        return -1L;
    }

    @Override
    public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
        LOG.debug("pollLedgerToRereplicate()");
        try {
            Watcher w = new Watcher(){

                @Override
                public void process(WatchedEvent e) {
                }
            };
            return this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L, w);
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    @Override
    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
        LOG.debug("getLedgerToRereplicate()");
        try {
            while (true) {
                this.waitIfLedgerReplicationDisabled();
                final CountDownLatch changedLatch = new CountDownLatch(1);
                Watcher w = new Watcher(){

                    @Override
                    public void process(WatchedEvent e) {
                        if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged || e.getType() == Watcher.Event.EventType.NodeDeleted || e.getType() == Watcher.Event.EventType.NodeCreated || e.getState() == Watcher.Event.KeeperState.Expired || e.getState() == Watcher.Event.KeeperState.Disconnected) {
                            changedLatch.countDown();
                        }
                    }
                };
                long ledger = this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L, w);
                if (ledger != -1L) {
                    return ledger;
                }
                changedLatch.await();
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.isLedgerReplicationEnabled()) {
            this.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    @Override
    public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
        LOG.debug("releaseLedger(ledgerId={})", (Object)ledgerId);
        try {
            Lock l = this.heldLocks.remove(ledgerId);
            if (l != null) {
                this.zkc.delete(l.getLockZNode(), -1);
            }
        }
        catch (KeeperException.NoNodeException nne) {
        }
        catch (KeeperException ke) {
            LOG.error("Error deleting underreplicated ledger lock", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    @Override
    public void close() throws ReplicationException.UnavailableException {
        LOG.debug("close()");
        try {
            for (Map.Entry<Long, Lock> e : this.heldLocks.entrySet()) {
                this.zkc.delete(e.getValue().getLockZNode(), -1);
            }
        }
        catch (KeeperException.NoNodeException nne) {
        }
        catch (KeeperException ke) {
            LOG.error("Error deleting underreplicated ledger lock", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    @Override
    public void disableLedgerReplication() throws ReplicationException.UnavailableException {
        LOG.debug("disableLedegerReplication()");
        try {
            ZkUtils.createFullPathOptimistic(this.zkc, this.basePath + '/' + "disable", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            LOG.info("Auto ledger re-replication is disabled!");
        }
        catch (KeeperException.NodeExistsException ke) {
            LOG.warn("AutoRecovery is already disabled!", (Throwable)ke);
            throw new ReplicationException.UnavailableException("AutoRecovery is already disabled!", ke);
        }
        catch (KeeperException ke) {
            LOG.error("Exception while stopping auto ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Exception while stopping auto ledger re-replication", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while stopping auto ledger re-replication", ie);
        }
    }

    @Override
    public void enableLedgerReplication() throws ReplicationException.UnavailableException {
        LOG.debug("enableLedegerReplication()");
        try {
            this.zkc.delete(this.basePath + '/' + "disable", -1);
            LOG.info("Resuming automatic ledger re-replication");
        }
        catch (KeeperException.NoNodeException ke) {
            LOG.warn("AutoRecovery is already enabled!", (Throwable)ke);
            throw new ReplicationException.UnavailableException("AutoRecovery is already enabled!", ke);
        }
        catch (KeeperException ke) {
            LOG.error("Exception while resuming ledger replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Exception while resuming auto ledger re-replication", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while resuming auto ledger re-replication", ie);
        }
    }

    @Override
    public boolean isLedgerReplicationEnabled() throws ReplicationException.UnavailableException {
        LOG.debug("isLedgerReplicationEnabled()");
        try {
            return null == this.zkc.exists(this.basePath + '/' + "disable", false);
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        LOG.debug("notifyLedgerReplicationEnabled()");
        Watcher w = new Watcher(){

            @Override
            public void process(WatchedEvent e) {
                if (e.getType() == Watcher.Event.EventType.NodeDeleted) {
                    cb.operationComplete(0, null);
                }
            }
        };
        try {
            if (null == this.zkc.exists(this.basePath + '/' + "disable", w)) {
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    private static class Lock {
        private final String lockZNode;
        private final int ledgerZNodeVersion;

        Lock(String lockZNode, int ledgerZNodeVersion) {
            this.lockZNode = lockZNode;
            this.ledgerZNodeVersion = ledgerZNodeVersion;
        }

        String getLockZNode() {
            return this.lockZNode;
        }

        int getLedgerZNodeVersion() {
            return this.ledgerZNodeVersion;
        }
    }
}

