/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.Message;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.ProtocolStringList;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.TextFormat;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.DNS;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.DataFormats;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.ReplicationException;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicationManager {
    private static final Logger log = LoggerFactory.getLogger(PulsarLedgerUnderreplicationManager.class);
    static final String LAYOUT = "BASIC";
    static final int LAYOUT_VERSION = 1;
    private static final byte[] LOCK_DATA = PulsarLedgerUnderreplicationManager.getLockData();
    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, Lock>();
    private static final Pattern ID_EXTRACTION_PATTERN = Pattern.compile("urL(\\d+)$");
    private final AbstractConfiguration conf;
    private final String basePath;
    private final String urLedgerPath;
    private final String urLockPath;
    private final String layoutPath;
    private final String lostBookieRecoveryDelayPath;
    private final String checkAllLedgersCtimePath;
    private final String placementPolicyCheckCtimePath;
    private final String replicasCheckCtimePath;
    private final MetadataStoreExtended store;
    private BookkeeperInternalCallbacks.GenericCallback<Void> replicationEnabledListener;
    private BookkeeperInternalCallbacks.GenericCallback<Void> lostBookieRecoveryDelayListener;

    public PulsarLedgerUnderreplicationManager(AbstractConfiguration<?> conf, MetadataStoreExtended store, String ledgerRootPath) throws ReplicationException.CompatibilityException {
        this.conf = conf;
        this.basePath = PulsarLedgerUnderreplicationManager.getBasePath(ledgerRootPath);
        this.layoutPath = this.basePath + "/LAYOUT";
        this.urLedgerPath = this.basePath + "/ledgers";
        this.urLockPath = this.basePath + "/locks";
        this.lostBookieRecoveryDelayPath = this.basePath + "/lostBookieRecoveryDelay";
        this.checkAllLedgersCtimePath = this.basePath + "/checkallledgersctime";
        this.placementPolicyCheckCtimePath = this.basePath + "/placementpolicycheckctime";
        this.replicasCheckCtimePath = this.basePath + "/replicascheckctime";
        this.store = store;
        store.registerListener(this::handleNotification);
        this.checkLayout();
    }

    static String getBasePath(String rootPath) {
        return String.format("%s/%s", rootPath, "underreplication");
    }

    static String getUrLockPath(String rootPath) {
        return String.format("%s/%s", PulsarLedgerUnderreplicationManager.getBasePath(rootPath), "locks");
    }

    public static byte[] getLockData() {
        DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder();
        try {
            lockDataBuilder.setBookieId(DNS.getDefaultHost("default"));
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        return lockDataBuilder.build().toString().getBytes(StandardCharsets.UTF_8);
    }

    private void checkLayout() throws ReplicationException.CompatibilityException {
        while (!this.store.exists(this.layoutPath).join().booleanValue()) {
            DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
            builder.setType(LAYOUT).setVersion(1);
            this.store.put(this.layoutPath, builder.build().toString().getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join();
        }
        byte[] layoutData = this.store.get(this.layoutPath).join().get().getValue();
        DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
        try {
            TextFormat.merge(new String(layoutData, StandardCharsets.UTF_8), (Message.Builder)builder);
            DataFormats.LedgerRereplicationLayoutFormat layout = builder.build();
            if (!layout.getType().equals(LAYOUT) || layout.getVersion() != 1) {
                throw new ReplicationException.CompatibilityException("Incompatible layout found (BASIC:1)");
            }
        }
        catch (TextFormat.ParseException pe) {
            throw new ReplicationException.CompatibilityException("Invalid data found", pe);
        }
    }

    private long getLedgerId(String path) throws NumberFormatException {
        Matcher m = ID_EXTRACTION_PATTERN.matcher(path);
        if (m.find()) {
            return Long.parseLong(m.group(1));
        }
        throw new NumberFormatException("Couldn't find ledgerid in path");
    }

    private static String getParentPath(String base, long ledgerId) {
        String subdir1 = String.format("%04x", ledgerId >> 48 & 0xFFFFL);
        String subdir2 = String.format("%04x", ledgerId >> 32 & 0xFFFFL);
        String subdir3 = String.format("%04x", ledgerId >> 16 & 0xFFFFL);
        String subdir4 = String.format("%04x", ledgerId & 0xFFFFL);
        return String.format("%s/%s/%s/%s/%s", base, subdir1, subdir2, subdir3, subdir4);
    }

    public static String getUrLedgerPath(String base, long ledgerId) {
        return String.format("%s/urL%010d", PulsarLedgerUnderreplicationManager.getParentPath(base, ledgerId), ledgerId);
    }

    public static String getUrLedgerLockPath(String base, long ledgerId) {
        return String.format("%s/urL%010d", base, ledgerId);
    }

    private String getUrLedgerPath(long ledgerId) {
        return PulsarLedgerUnderreplicationManager.getUrLedgerPath(this.urLedgerPath, ledgerId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleNotification(Notification n) {
        if (n.getPath().startsWith(this.basePath)) {
            PulsarLedgerUnderreplicationManager pulsarLedgerUnderreplicationManager = this;
            synchronized (pulsarLedgerUnderreplicationManager) {
                this.notifyAll();
                if (n.getType() == NotificationType.Deleted) {
                    if (n.getPath().equals(this.basePath + "/disable")) {
                        log.info("LedgerReplication is enabled externally through MetadataStore, since DISABLE_NODE ZNode is deleted");
                        if (this.replicationEnabledListener != null) {
                            this.replicationEnabledListener.operationComplete(0, null);
                        }
                    } else if (n.getPath().equals(this.lostBookieRecoveryDelayPath) && this.lostBookieRecoveryDelayListener != null) {
                        this.lostBookieRecoveryDelayListener.operationComplete(0, null);
                    }
                }
            }
        }
    }

    @Override
    public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId) throws ReplicationException.UnavailableException {
        try {
            String path = this.getUrLedgerPath(ledgerId);
            Optional<GetResult> optRes = this.store.get(path).get();
            if (!optRes.isPresent()) {
                if (log.isDebugEnabled()) {
                    log.debug("Ledger: {} is not marked underreplicated", (Object)ledgerId);
                }
                return null;
            }
            byte[] data = optRes.get().getValue();
            DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            TextFormat.merge(new String(data, StandardCharsets.UTF_8), (Message.Builder)builder);
            DataFormats.UnderreplicatedLedgerFormat underreplicatedLedgerFormat = builder.build();
            PulsarUnderreplicatedLedger underreplicatedLedger = new PulsarUnderreplicatedLedger(ledgerId);
            ProtocolStringList replicaList = underreplicatedLedgerFormat.getReplicaList();
            long ctime = underreplicatedLedgerFormat.hasCtime() ? underreplicatedLedgerFormat.getCtime() : -1L;
            underreplicatedLedger.setCtime(ctime);
            underreplicatedLedger.setReplicaList(replicaList);
            return underreplicatedLedger;
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting with metadata store", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie);
        }
        catch (TextFormat.ParseException pe) {
            throw new ReplicationException.UnavailableException("Error parsing proto message", pe);
        }
    }

    @Override
    public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
        if (log.isDebugEnabled()) {
            log.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", (Object)ledgerId, missingReplicas);
        }
        String path = this.getUrLedgerPath(ledgerId);
        CompletableFuture<Void> createFuture = new CompletableFuture<Void>();
        this.tryMarkLedgerUnderreplicatedAsync(path, missingReplicas, createFuture);
        return createFuture;
    }

    private void tryMarkLedgerUnderreplicatedAsync(String path, Collection<String> missingReplicas, CompletableFuture<Void> finalFuture) {
        DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
        if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
            builder.setCtime(System.currentTimeMillis());
        }
        missingReplicas.forEach(builder::addReplica);
        byte[] urLedgerData = builder.build().toString().getBytes(StandardCharsets.UTF_8);
        ((CompletableFuture)this.store.put(path, urLedgerData, Optional.of(-1L)).thenRun(() -> FutureUtils.complete(finalFuture, null))).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                this.handleLedgerUnderreplicatedAlreadyMarked(path, missingReplicas, finalFuture);
            } else {
                FutureUtils.completeExceptionally(finalFuture, ex);
            }
            return null;
        });
    }

    private void handleLedgerUnderreplicatedAlreadyMarked(String path, Collection<String> missingReplicas, CompletableFuture<Void> finalFuture) {
        ((CompletableFuture)this.store.get(path).thenAccept(optRes -> {
            if (!optRes.isPresent()) {
                this.tryMarkLedgerUnderreplicatedAsync(path, missingReplicas, finalFuture);
                return;
            }
            byte[] existingUrLedgerData = ((GetResult)optRes.get()).getValue();
            DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            try {
                TextFormat.merge(new String(existingUrLedgerData, StandardCharsets.UTF_8), (Message.Builder)builder);
            }
            catch (TextFormat.ParseException e) {
                FutureUtils.completeExceptionally(finalFuture, new ReplicationException.UnavailableException("Invalid underreplicated ledger data for ledger " + path, e));
                return;
            }
            DataFormats.UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
            boolean replicaAdded = false;
            for (String missingReplica : missingReplicas) {
                if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) continue;
                builder.addReplica(missingReplica);
                replicaAdded = true;
            }
            if (!replicaAdded) {
                FutureUtils.complete(finalFuture, null);
                return;
            }
            if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
                builder.setCtime(System.currentTimeMillis());
            }
            byte[] newUrLedgerData = builder.build().toString().getBytes(StandardCharsets.UTF_8);
            ((CompletableFuture)this.store.put(path, newUrLedgerData, Optional.of(((GetResult)optRes.get()).getStat().getVersion())).thenRun(() -> FutureUtils.complete(finalFuture, null))).exceptionally(ex -> {
                FutureUtils.completeExceptionally(finalFuture, ex);
                return null;
            });
        })).exceptionally(ex -> {
            FutureUtils.completeExceptionally(finalFuture, ex);
            return null;
        });
    }

    @Override
    public void acquireUnderreplicatedLedger(long ledgerId) throws ReplicationException {
        try {
            this.internalAcquireUnderreplicatedLedger(ledgerId);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new ReplicationException.UnavailableException("Failed to acuire under-replicated ledger", e);
        }
    }

    private void internalAcquireUnderreplicatedLedger(long ledgerId) throws ExecutionException, InterruptedException {
        String lockPath = PulsarLedgerUnderreplicationManager.getUrLedgerLockPath(this.urLockPath, ledgerId);
        this.store.put(lockPath, LOCK_DATA, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).get();
    }

    @Override
    public void markLedgerReplicated(long ledgerId) throws ReplicationException.UnavailableException {
        block11: {
            if (log.isDebugEnabled()) {
                log.debug("markLedgerReplicated(ledgerId={})", (Object)ledgerId);
            }
            try {
                Lock l = this.heldLocks.get(ledgerId);
                if (l != null) {
                    this.store.delete(this.getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())).get();
                }
            }
            catch (ExecutionException ee) {
                if (ee.getCause() instanceof MetadataStoreException.NotFoundException) {
                    break block11;
                }
                if (ee.getCause() instanceof MetadataStoreException.BadVersionException) {
                    break block11;
                }
                log.error("Error deleting underreplicated ledger node", (Throwable)ee);
                throw new ReplicationException.UnavailableException("Error contacting metadata store", ee);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while contacting metadata store", ie);
            }
            finally {
                this.releaseUnderreplicatedLedger(ledgerId);
            }
        }
    }

    @Override
    public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
        final LinkedList<String> queue = new LinkedList<String>();
        queue.add(this.urLedgerPath);
        return new Iterator<UnderreplicatedLedger>(){
            final Queue<UnderreplicatedLedger> curBatch = new LinkedList<UnderreplicatedLedger>();

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }

            @Override
            public boolean hasNext() {
                if (this.curBatch.size() > 0) {
                    return true;
                }
                while (queue.size() > 0 && this.curBatch.size() == 0) {
                    String parent = (String)queue.remove();
                    try {
                        for (String c : PulsarLedgerUnderreplicationManager.this.store.getChildren(parent).get()) {
                            String child = parent + "/" + c;
                            if (c.startsWith("urL")) {
                                long ledgerId = PulsarLedgerUnderreplicationManager.this.getLedgerId(child);
                                UnderreplicatedLedger underreplicatedLedger = PulsarLedgerUnderreplicationManager.this.getLedgerUnreplicationInfo(ledgerId);
                                if (underreplicatedLedger == null) continue;
                                List<String> replicaList = underreplicatedLedger.getReplicaList();
                                if (predicate != null && !predicate.test(replicaList)) continue;
                                this.curBatch.add(underreplicatedLedger);
                                continue;
                            }
                            queue.add(child);
                        }
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading list", e);
                    }
                }
                return this.curBatch.size() > 0;
            }

            @Override
            public UnderreplicatedLedger next() {
                assert (this.curBatch.size() > 0);
                return this.curBatch.remove();
            }
        };
    }

    private long getLedgerToRereplicateFromHierarchy(String parent, long depth) throws ExecutionException, InterruptedException {
        if (depth == 4L) {
            ArrayList children = new ArrayList(this.store.getChildren(parent).get());
            Collections.shuffle(children);
            while (!children.isEmpty()) {
                String tryChild = (String)children.get(0);
                try {
                    List<String> locks = this.store.getChildren(this.urLockPath).get();
                    if (locks.contains(tryChild)) {
                        children.remove(tryChild);
                        continue;
                    }
                    Optional<GetResult> optRes = this.store.get(parent + "/" + tryChild).get();
                    if (!optRes.isPresent()) {
                        if (log.isDebugEnabled()) {
                            log.debug("{}/{} doesn't exist", (Object)parent, (Object)tryChild);
                        }
                        children.remove(tryChild);
                        continue;
                    }
                    long ledgerId = this.getLedgerId(tryChild);
                    this.internalAcquireUnderreplicatedLedger(ledgerId);
                    String lockPath = PulsarLedgerUnderreplicationManager.getUrLedgerLockPath(this.urLockPath, ledgerId);
                    this.heldLocks.put(ledgerId, new Lock(lockPath, optRes.get().getStat().getVersion()));
                    return ledgerId;
                }
                catch (ExecutionException ee) {
                    if (ee.getCause() instanceof MetadataStoreException.BadVersionException) {
                        children.remove(tryChild);
                        continue;
                    }
                    throw ee;
                }
                catch (NumberFormatException nfe) {
                    children.remove(tryChild);
                }
            }
            return -1L;
        }
        ArrayList children = new ArrayList(this.store.getChildren(parent).join());
        Collections.shuffle(children);
        while (children.size() > 0) {
            String tryChild = (String)children.get(0);
            String tryPath = parent + "/" + tryChild;
            long ledger = this.getLedgerToRereplicateFromHierarchy(tryPath, depth + 1L);
            if (ledger != -1L) {
                return ledger;
            }
            children.remove(tryChild);
        }
        return -1L;
    }

    @Override
    public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("pollLedgerToRereplicate()");
        }
        try {
            return this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting metadata store", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("getLedgerToRereplicate()");
        }
        try {
            while (true) {
                this.waitIfLedgerReplicationDisabled();
                long ledger = this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
                if (ledger != -1L) {
                    return ledger;
                }
                PulsarLedgerUnderreplicationManager pulsarLedgerUnderreplicationManager = this;
                synchronized (pulsarLedgerUnderreplicationManager) {
                    this.wait(1000L);
                }
            }
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting metadata store", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie);
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.isLedgerReplicationEnabled()) {
            this.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    @Override
    public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("releaseLedger(ledgerId={})", (Object)ledgerId);
        }
        try {
            Lock l = this.heldLocks.get(ledgerId);
            if (l != null) {
                this.store.delete(l.getLockPath(), Optional.empty()).get();
            }
        }
        catch (ExecutionException ee) {
            if (!(ee.getCause() instanceof MetadataStoreException.NotFoundException)) {
                log.error("Error deleting underreplicated ledger lock", (Throwable)ee);
                throw new ReplicationException.UnavailableException("Error contacting metadata store", ee);
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie);
        }
        this.heldLocks.remove(ledgerId);
    }

    @Override
    public void close() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("close()");
        }
        try {
            for (Map.Entry<Long, Lock> e : this.heldLocks.entrySet()) {
                this.store.delete(e.getValue().getLockPath(), Optional.empty()).get();
            }
        }
        catch (ExecutionException ee) {
            if (!(ee.getCause() instanceof MetadataStoreException.NotFoundException)) {
                log.error("Error deleting underreplicated ledger lock", (Throwable)ee);
                throw new ReplicationException.UnavailableException("Error contacting metadata store", ee);
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie);
        }
    }

    @Override
    public void disableLedgerReplication() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("disableLedegerReplication()");
        }
        try {
            String path = this.basePath + "/disable";
            this.store.put(path, "".getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get();
            log.info("Auto ledger re-replication is disabled!");
        }
        catch (ExecutionException ee) {
            log.error("Exception while stopping auto ledger re-replication", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Exception while stopping auto ledger re-replication", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while stopping auto ledger re-replication", ie);
        }
    }

    @Override
    public void enableLedgerReplication() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("enableLedegerReplication()");
        }
        try {
            this.store.delete(this.basePath + "/disable", Optional.empty()).get();
            log.info("Resuming automatic ledger re-replication");
        }
        catch (ExecutionException ee) {
            log.error("Exception while resuming ledger replication", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Exception while resuming auto ledger re-replication", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while resuming auto ledger re-replication", ie);
        }
    }

    @Override
    public boolean isLedgerReplicationEnabled() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("isLedgerReplicationEnabled()");
        }
        try {
            return this.store.exists(this.basePath + "/disable").get() == false;
        }
        catch (ExecutionException ee) {
            log.error("Error while checking the state of ledger re-replication", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyLedgerReplicationEnabled(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("notifyLedgerReplicationEnabled()");
        }
        PulsarLedgerUnderreplicationManager pulsarLedgerUnderreplicationManager = this;
        synchronized (pulsarLedgerUnderreplicationManager) {
            this.replicationEnabledListener = cb;
        }
        try {
            if (!this.store.exists(this.basePath + "/disable").get().booleanValue()) {
                log.info("LedgerReplication is enabled externally through metadata store, since DISABLE_NODE node is deleted");
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (ExecutionException ee) {
            log.error("Error while checking the state of ledger re-replication", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException {
        try {
            return this.store.exists(PulsarLedgerUnderreplicationManager.getUrLedgerLockPath(this.urLockPath, ledgerId)).get();
        }
        catch (Exception e) {
            throw new ReplicationException.UnavailableException("Failed to check if ledger is beinge replicated", e);
        }
    }

    @Override
    public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException {
        log.debug("initializeLostBookieRecoveryDelay()");
        try {
            this.store.put(this.lostBookieRecoveryDelayPath, Integer.toString(lostBookieRecoveryDelay).getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get();
        }
        catch (ExecutionException ee) {
            if (ee.getCause() instanceof MetadataStoreException.BadVersionException) {
                log.info("lostBookieRecoveryDelay node is already present, so using existing lostBookieRecoveryDelay node value");
                return false;
            }
            log.error("Error while initializing LostBookieRecoveryDelay", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        return true;
    }

    @Override
    public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException {
        log.debug("setLostBookieRecoveryDelay()");
        try {
            this.store.put(this.lostBookieRecoveryDelayPath, Integer.toString(lostBookieRecoveryDelay).getBytes(StandardCharsets.UTF_8), Optional.empty()).get();
        }
        catch (ExecutionException ee) {
            log.error("Error while setting LostBookieRecoveryDelay ", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException {
        log.debug("getLostBookieRecoveryDelay()");
        try {
            byte[] data = this.store.get(this.lostBookieRecoveryDelayPath).get().get().getValue();
            return Integer.parseInt(new String(data, StandardCharsets.UTF_8));
        }
        catch (ExecutionException ee) {
            log.error("Error while getting LostBookieRecoveryDelay ", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        log.debug("notifyLostBookieRecoveryDelayChanged()");
        PulsarLedgerUnderreplicationManager pulsarLedgerUnderreplicationManager = this;
        synchronized (pulsarLedgerUnderreplicationManager) {
            this.lostBookieRecoveryDelayListener = cb;
        }
        try {
            if (!this.store.exists(this.lostBookieRecoveryDelayPath).get().booleanValue()) {
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (ExecutionException ee) {
            log.error("Error while checking the state of lostBookieRecoveryDelay", (Throwable)ee);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public String getReplicationWorkerIdRereplicatingLedger(long ledgerId) throws ReplicationException.UnavailableException {
        try {
            Optional<GetResult> optRes = this.store.get(PulsarLedgerUnderreplicationManager.getUrLedgerLockPath(this.urLockPath, ledgerId)).get();
            if (!optRes.isPresent()) {
                return null;
            }
            byte[] lockData = optRes.get().getValue();
            DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder();
            TextFormat.merge(new String(lockData, StandardCharsets.UTF_8), (Message.Builder)lockDataBuilder);
            DataFormats.LockDataFormat lock = lockDataBuilder.build();
            return lock.getBookieId();
        }
        catch (ExecutionException e) {
            log.error("Error while getting ReplicationWorkerId rereplicating Ledger", (Throwable)e);
            throw new ReplicationException.UnavailableException("Error while getting ReplicationWorkerId rereplicating Ledger", e);
        }
        catch (InterruptedException e) {
            log.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", (Throwable)e);
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        }
        catch (TextFormat.ParseException e) {
            log.error("Error while parsing ZK data of lock", (Throwable)e);
            throw new ReplicationException.UnavailableException("Error while parsing ZK data of lock", e);
        }
    }

    @Override
    public void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("setCheckAllLedgersCTime");
        }
        try {
            DataFormats.CheckAllLedgersFormat.Builder builder = DataFormats.CheckAllLedgersFormat.newBuilder();
            builder.setCheckAllLedgersCTime(checkAllLedgersCTime);
            byte[] checkAllLedgersFormatByteArray = builder.build().toByteArray();
            this.store.put(this.checkAllLedgersCtimePath, checkAllLedgersFormatByteArray, Optional.empty()).get();
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public long getCheckAllLedgersCTime() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("setCheckAllLedgersCTime");
        }
        try {
            Optional<GetResult> optRes = this.store.get(this.checkAllLedgersCtimePath).get();
            if (!optRes.isPresent()) {
                log.warn("checkAllLedgersCtimeZnode is not yet available");
                return -1L;
            }
            byte[] data = optRes.get().getValue();
            DataFormats.CheckAllLedgersFormat checkAllLedgersFormat = DataFormats.CheckAllLedgersFormat.parseFrom(data);
            return checkAllLedgersFormat.hasCheckAllLedgersCTime() ? checkAllLedgersFormat.getCheckAllLedgersCTime() : -1L;
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        catch (InvalidProtocolBufferException ipbe) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
        }
    }

    @Override
    public void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("setPlacementPolicyCheckCTime");
        }
        try {
            DataFormats.PlacementPolicyCheckFormat.Builder builder = DataFormats.PlacementPolicyCheckFormat.newBuilder();
            builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
            byte[] placementPolicyCheckFormatByteArray = builder.build().toByteArray();
            this.store.put(this.placementPolicyCheckCtimePath, placementPolicyCheckFormatByteArray, Optional.empty()).get();
        }
        catch (ExecutionException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public long getPlacementPolicyCheckCTime() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("getPlacementPolicyCheckCTime");
        }
        try {
            Optional<GetResult> optRes = this.store.get(this.placementPolicyCheckCtimePath).get();
            if (!optRes.isPresent()) {
                log.warn("placementPolicyCheckCtimeZnode is not yet available");
                return -1L;
            }
            byte[] data = optRes.get().getValue();
            DataFormats.PlacementPolicyCheckFormat placementPolicyCheckFormat = DataFormats.PlacementPolicyCheckFormat.parseFrom(data);
            return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime() ? placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1L;
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        catch (InvalidProtocolBufferException ipbe) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
        }
    }

    @Override
    public void setReplicasCheckCTime(long replicasCheckCTime) throws ReplicationException.UnavailableException {
        try {
            DataFormats.ReplicasCheckFormat.Builder builder = DataFormats.ReplicasCheckFormat.newBuilder();
            builder.setReplicasCheckCTime(replicasCheckCTime);
            byte[] replicasCheckFormatByteArray = builder.build().toByteArray();
            this.store.put(this.replicasCheckCtimePath, replicasCheckFormatByteArray, Optional.empty()).get();
            if (log.isDebugEnabled()) {
                log.debug("setReplicasCheckCTime completed successfully");
            }
        }
        catch (ExecutionException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public long getReplicasCheckCTime() throws ReplicationException.UnavailableException {
        try {
            Optional<GetResult> optRes = this.store.get(this.replicasCheckCtimePath).get();
            if (!optRes.isPresent()) {
                log.warn("placementPolicyCheckCtimeZnode is not yet available");
                return -1L;
            }
            byte[] data = optRes.get().getValue();
            DataFormats.ReplicasCheckFormat replicasCheckFormat = DataFormats.ReplicasCheckFormat.parseFrom(data);
            if (log.isDebugEnabled()) {
                log.debug("getReplicasCheckCTime completed successfully");
            }
            return replicasCheckFormat.hasReplicasCheckCTime() ? replicasCheckFormat.getReplicasCheckCTime() : -1L;
        }
        catch (ExecutionException ee) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        catch (InvalidProtocolBufferException ipbe) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
        }
    }

    @Override
    public void notifyUnderReplicationLedgerChanged(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        log.debug("notifyUnderReplicationLedgerChanged()");
        this.store.registerListener(e -> {
            if (e.getType() == NotificationType.Deleted && ID_EXTRACTION_PATTERN.matcher(e.getPath()).find()) {
                cb.operationComplete(0, null);
            }
        });
    }

    private static class PulsarUnderreplicatedLedger
    extends UnderreplicatedLedger {
        PulsarUnderreplicatedLedger(long ledgerId) {
            super(ledgerId);
        }

        @Override
        protected void setCtime(long ctime) {
            super.setCtime(ctime);
        }

        @Override
        protected void setReplicaList(List<String> replicaList) {
            super.setReplicaList(replicaList);
        }
    }

    private static class Lock {
        private final String lockPath;
        private final long ledgerNodeVersion;

        Lock(String lockPath, long ledgerNodeVersion) {
            this.lockPath = lockPath;
            this.ledgerNodeVersion = ledgerNodeVersion;
        }

        String getLockPath() {
            return this.lockPath;
        }

        long getLedgerNodeVersion() {
            return this.ledgerNodeVersion;
        }
    }
}

