package org.apache.pulsar.shade.org.apache.bookkeeper.meta;

import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pulsar.shade.com.google.common.base.Joiner;
import org.apache.pulsar.shade.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.pulsar.shade.com.google.protobuf.ProtocolStringList;
import org.apache.pulsar.shade.com.google.protobuf.TextFormat;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.DNS;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.DataFormats;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationException;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.SubTreeCache;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.ACL;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/shade/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.class */
public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationManager {
    static final String LAYOUT = "BASIC";
    static final int LAYOUT_VERSION = 1;
    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap();
    private final Pattern idExtractionPattern = Pattern.compile("urL(\\d+)$");
    private final String basePath;
    private final String urLedgerPath;
    private final String urLockPath;
    private final String layoutZNode;
    private final AbstractConfiguration conf;
    private final String lostBookieRecoveryDelayZnode;
    private final String checkAllLedgersCtimeZnode;
    private final String placementPolicyCheckCtimeZnode;
    private final String replicasCheckCtimeZnode;
    private final ZooKeeper zkc;
    private final SubTreeCache subTreeCache;
    static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
    private static final byte[] LOCK_DATA = getLockData();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/shade/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager$Lock.class */
    public static class Lock {
        private final String lockZNode;
        private final int ledgerZNodeVersion;

        Lock(String str, int i) {
            this.lockZNode = str;
            this.ledgerZNodeVersion = i;
        }

        String getLockZNode() {
            return this.lockZNode;
        }

        int getLedgerZNodeVersion() {
            return this.ledgerZNodeVersion;
        }
    }

    public ZkLedgerUnderreplicationManager(AbstractConfiguration abstractConfiguration, final ZooKeeper zooKeeper) throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        this.conf = abstractConfiguration;
        this.basePath = getBasePath(ZKMetadataDriverBase.resolveZkLedgersRootPath(abstractConfiguration));
        this.layoutZNode = this.basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
        this.urLedgerPath = this.basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
        this.urLockPath = this.basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
        this.lostBookieRecoveryDelayZnode = this.basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
        this.checkAllLedgersCtimeZnode = this.basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
        this.placementPolicyCheckCtimeZnode = this.basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
        this.replicasCheckCtimeZnode = this.basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
        this.zkc = zooKeeper;
        this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() { // from class: org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager.1
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.util.SubTreeCache.TreeProvider
            public List<String> getChildren(String str, Watcher watcher) throws InterruptedException, KeeperException {
                return zooKeeper.getChildren(str, watcher);
            }
        });
        checkLayout();
    }

    public static String getBasePath(String str) {
        return String.format("%s/%s", str, BookKeeperConstants.UNDER_REPLICATION_NODE);
    }

    public static String getUrLockPath(String str) {
        return String.format("%s/%s", getBasePath(str), BookKeeperConstants.UNDER_REPLICATION_LOCK);
    }

    public static byte[] getLockData() {
        DataFormats.LockDataFormat.Builder newBuilder = DataFormats.LockDataFormat.newBuilder();
        try {
            newBuilder.setBookieId(DNS.getDefaultHost("default"));
        } catch (UnknownHostException e) {
        }
        return newBuilder.build().toString().getBytes(StandardCharsets.UTF_8);
    }

    private void checkLayout() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        List<ACL> aCLs = ZkUtils.getACLs(this.conf);
        if (this.zkc.exists(this.basePath, false) == null) {
            try {
                this.zkc.create(this.basePath, new byte[0], aCLs, CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
            }
        }
        while (this.zkc.exists(this.layoutZNode, false) == null) {
            DataFormats.LedgerRereplicationLayoutFormat.Builder newBuilder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
            newBuilder.setType("BASIC").setVersion(1);
            try {
                this.zkc.create(this.layoutZNode, newBuilder.build().toString().getBytes(StandardCharsets.UTF_8), aCLs, CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e2) {
            }
        }
        byte[] data = this.zkc.getData(this.layoutZNode, false, (Stat) null);
        DataFormats.LedgerRereplicationLayoutFormat.Builder newBuilder2 = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
        try {
            TextFormat.merge(new String(data, StandardCharsets.UTF_8), newBuilder2);
            DataFormats.LedgerRereplicationLayoutFormat build = newBuilder2.build();
            if (!build.getType().equals("BASIC") || build.getVersion() != 1) {
                throw new ReplicationException.CompatibilityException("Incompatible layout found (BASIC:1)");
            }
            if (this.zkc.exists(this.urLedgerPath, false) == null) {
                try {
                    this.zkc.create(this.urLedgerPath, new byte[0], aCLs, CreateMode.PERSISTENT);
                } catch (KeeperException.NodeExistsException e3) {
                }
            }
            if (this.zkc.exists(this.urLockPath, false) == null) {
                try {
                    this.zkc.create(this.urLockPath, new byte[0], aCLs, CreateMode.PERSISTENT);
                } catch (KeeperException.NodeExistsException e4) {
                }
            }
        } catch (TextFormat.ParseException e5) {
            throw new ReplicationException.CompatibilityException("Invalid data found", e5);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLedgerId(String str) throws NumberFormatException {
        Matcher matcher = this.idExtractionPattern.matcher(str);
        if (matcher.find()) {
            return Long.parseLong(matcher.group(1));
        }
        throw new NumberFormatException("Couldn't find ledgerid in path");
    }

    public static String getParentZnodePath(String str, long j) {
        return String.format("%s/%s/%s/%s/%s", str, String.format("%04x", Long.valueOf((j >> 48) & 65535)), String.format("%04x", Long.valueOf((j >> 32) & 65535)), String.format("%04x", Long.valueOf((j >> 16) & 65535)), String.format("%04x", Long.valueOf(j & 65535)));
    }

    public static String getUrLedgerZnode(String str, long j) {
        return String.format("%s/urL%010d", getParentZnodePath(str, j), Long.valueOf(j));
    }

    public static String getUrLedgerLockZnode(String str, long j) {
        return String.format("%s/urL%010d", str, Long.valueOf(j));
    }

    private String getUrLedgerZnode(long j) {
        return getUrLedgerZnode(this.urLedgerPath, j);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public UnderreplicatedLedger getLedgerUnreplicationInfo(long j) throws ReplicationException.UnavailableException {
        try {
            String urLedgerZnode = getUrLedgerZnode(j);
            DataFormats.UnderreplicatedLedgerFormat.Builder newBuilder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            try {
                TextFormat.merge(new String(this.zkc.getData(urLedgerZnode, false, (Stat) null), StandardCharsets.UTF_8), newBuilder);
                DataFormats.UnderreplicatedLedgerFormat build = newBuilder.build();
                UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(j);
                ProtocolStringList replicaList = build.getReplicaList();
                underreplicatedLedger.setCtime(build.hasCtime() ? build.getCtime() : -1L);
                underreplicatedLedger.setReplicaList(replicaList);
                return underreplicatedLedger;
            } catch (KeeperException.NoNodeException e) {
                if (!LOG.isDebugEnabled()) {
                    return null;
                }
                LOG.debug("Ledger: {} is not marked underreplicated", Long.valueOf(j));
                return null;
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", e2);
        } catch (TextFormat.ParseException e3) {
            throw new ReplicationException.UnavailableException("Error parsing proto message", e3);
        } catch (KeeperException e4) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e4);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long j, Collection<String> collection) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", Long.valueOf(j), collection);
        }
        List<ACL> aCLs = ZkUtils.getACLs(this.conf);
        String urLedgerZnode = getUrLedgerZnode(j);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        tryMarkLedgerUnderreplicatedAsync(urLedgerZnode, collection, aCLs, completableFuture);
        return completableFuture;
    }

    private void tryMarkLedgerUnderreplicatedAsync(String str, Collection<String> collection, List<ACL> list, CompletableFuture<Void> completableFuture) {
        DataFormats.UnderreplicatedLedgerFormat.Builder newBuilder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
        if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
            newBuilder.setCtime(System.currentTimeMillis());
        }
        newBuilder.getClass();
        collection.forEach(newBuilder::addReplica);
        ZkUtils.asyncCreateFullPathOptimistic(this.zkc, str, newBuilder.build().toString().getBytes(StandardCharsets.UTF_8), list, CreateMode.PERSISTENT, (i, str2, obj, str3) -> {
            if (KeeperException.Code.OK.intValue() == i) {
                FutureUtils.complete(completableFuture, null);
            } else if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                handleLedgerUnderreplicatedAlreadyMarked(str, collection, list, completableFuture);
            } else {
                FutureUtils.completeExceptionally(completableFuture, KeeperException.create(KeeperException.Code.get(i)));
            }
        }, null);
    }

    private void handleLedgerUnderreplicatedAlreadyMarked(String str, Collection<String> collection, List<ACL> list, CompletableFuture<Void> completableFuture) {
        this.zkc.getData(str, false, (i, str2, obj, bArr, stat) -> {
            if (KeeperException.Code.OK.intValue() != i) {
                if (KeeperException.Code.NONODE.intValue() == i) {
                    tryMarkLedgerUnderreplicatedAsync(str, collection, list, completableFuture);
                    return;
                } else {
                    FutureUtils.completeExceptionally(completableFuture, KeeperException.create(KeeperException.Code.get(i)));
                    return;
                }
            }
            DataFormats.UnderreplicatedLedgerFormat.Builder newBuilder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            try {
                TextFormat.merge(new String(bArr, StandardCharsets.UTF_8), newBuilder);
                DataFormats.UnderreplicatedLedgerFormat build = newBuilder.build();
                boolean z = false;
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    String str2 = (String) it.next();
                    if (!build.getReplicaList().contains(str2)) {
                        newBuilder.addReplica(str2);
                        z = true;
                    }
                }
                if (!z) {
                    FutureUtils.complete(completableFuture, null);
                    return;
                }
                if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
                    newBuilder.setCtime(System.currentTimeMillis());
                }
                this.zkc.setData(str, newBuilder.build().toString().getBytes(StandardCharsets.UTF_8), stat.getVersion(), (i, str3, obj, stat) -> {
                    if (KeeperException.Code.OK.intValue() == i) {
                        FutureUtils.complete(completableFuture, null);
                        return;
                    }
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        tryMarkLedgerUnderreplicatedAsync(str, collection, list, completableFuture);
                    } else if (KeeperException.Code.BADVERSION.intValue() == i) {
                        handleLedgerUnderreplicatedAlreadyMarked(str, collection, list, completableFuture);
                    } else {
                        FutureUtils.completeExceptionally(completableFuture, KeeperException.create(KeeperException.Code.get(i)));
                    }
                }, null);
            } catch (TextFormat.ParseException e) {
                FutureUtils.completeExceptionally(completableFuture, new ReplicationException.UnavailableException("Invalid underreplicated ledger data for ledger " + str, e));
            }
        }, (Object) null);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void markLedgerReplicated(long j) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("markLedgerReplicated(ledgerId={})", Long.valueOf(j));
        }
        try {
            try {
                try {
                    Lock lock = this.heldLocks.get(Long.valueOf(j));
                    if (lock != null) {
                        this.zkc.delete(getUrLedgerZnode(j), lock.getLedgerZNodeVersion());
                        try {
                            String[] split = getUrLedgerZnode(j).split("/");
                            for (int i = 1; i <= 4; i++) {
                                String join = Joiner.on("/").join((String[]) Arrays.copyOf(split, split.length - i));
                                Stat exists = this.zkc.exists(join, (Watcher) null);
                                if (exists != null) {
                                    this.zkc.delete(join, exists.getVersion());
                                }
                            }
                        } catch (KeeperException.NotEmptyException e) {
                        }
                    }
                    releaseUnderreplicatedLedger(j);
                } catch (KeeperException e2) {
                    LOG.error("Error deleting underreplicated ledger znode", e2);
                    throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e3);
            } catch (KeeperException.BadVersionException e4) {
                releaseUnderreplicatedLedger(j);
            } catch (KeeperException.NoNodeException e5) {
                releaseUnderreplicatedLedger(j);
            }
        } catch (Throwable th) {
            releaseUnderreplicatedLedger(j);
            throw th;
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
        final LinkedList linkedList = new LinkedList();
        linkedList.add(this.urLedgerPath);
        return new Iterator<UnderreplicatedLedger>() { // from class: org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager.2
            final Queue<UnderreplicatedLedger> curBatch = new LinkedList();
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.curBatch.size() > 0) {
                    return true;
                }
                while (linkedList.size() > 0 && this.curBatch.size() == 0) {
                    String str = (String) linkedList.remove();
                    try {
                        for (String str2 : ZkLedgerUnderreplicationManager.this.zkc.getChildren(str, false)) {
                            String str3 = str + "/" + str2;
                            if (str2.startsWith("urL")) {
                                UnderreplicatedLedger ledgerUnreplicationInfo = ZkLedgerUnderreplicationManager.this.getLedgerUnreplicationInfo(ZkLedgerUnderreplicationManager.this.getLedgerId(str3));
                                if (ledgerUnreplicationInfo != null) {
                                    List<String> replicaList = ledgerUnreplicationInfo.getReplicaList();
                                    if (predicate == null || predicate.test(replicaList)) {
                                        this.curBatch.add(ledgerUnreplicationInfo);
                                    }
                                }
                            } else {
                                linkedList.add(str3);
                            }
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return false;
                    } catch (KeeperException.NoNodeException e2) {
                    } catch (Exception e3) {
                        throw new RuntimeException("Error reading list", e3);
                    }
                }
                return this.curBatch.size() > 0;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public UnderreplicatedLedger next() {
                if ($assertionsDisabled || this.curBatch.size() > 0) {
                    return this.curBatch.remove();
                }
                throw new AssertionError();
            }

            static {
                $assertionsDisabled = !ZkLedgerUnderreplicationManager.class.desiredAssertionStatus();
            }
        };
    }

    private long getLedgerToRereplicateFromHierarchy(String str, long j) throws KeeperException, InterruptedException {
        if (j != 4) {
            try {
                List<String> children = this.subTreeCache.getChildren(str);
                Collections.shuffle(children);
                while (children.size() > 0) {
                    String str2 = children.get(0);
                    long ledgerToRereplicateFromHierarchy = getLedgerToRereplicateFromHierarchy(str + "/" + str2, j + 1);
                    if (ledgerToRereplicateFromHierarchy != -1) {
                        return ledgerToRereplicateFromHierarchy;
                    }
                    children.remove(str2);
                }
                return -1L;
            } catch (KeeperException.NoNodeException e) {
                return -1L;
            }
        }
        try {
            List<String> children2 = this.subTreeCache.getChildren(str);
            Collections.shuffle(children2);
            List<ACL> aCLs = ZkUtils.getACLs(this.conf);
            while (children2.size() > 0) {
                String str3 = children2.get(0);
                try {
                    if (this.subTreeCache.getChildren(this.urLockPath).contains(str3)) {
                        children2.remove(str3);
                    } else {
                        Stat exists = this.zkc.exists(str + "/" + str3, false);
                        if (exists != null) {
                            String str4 = this.urLockPath + "/" + str3;
                            long ledgerId = getLedgerId(str3);
                            this.zkc.create(str4, LOCK_DATA, aCLs, CreateMode.EPHEMERAL);
                            this.heldLocks.put(Long.valueOf(ledgerId), new Lock(str4, exists.getVersion()));
                            return ledgerId;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}/{} doesn't exist", str, str3);
                        }
                        children2.remove(str3);
                    }
                } catch (NumberFormatException e2) {
                    children2.remove(str3);
                } catch (KeeperException.NodeExistsException e3) {
                    children2.remove(str3);
                }
            }
            return -1L;
        } catch (KeeperException.NoNodeException e4) {
            return -1L;
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("pollLedgerToRereplicate()");
        }
        try {
            return getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", e);
        } catch (KeeperException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
        SubTreeCache.WatchGuard registerWatcherWithGuard;
        Throwable th;
        long ledgerToRereplicateFromHierarchy;
        if (LOG.isDebugEnabled()) {
            LOG.debug("getLedgerToRereplicate()");
        }
        while (true) {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            try {
                registerWatcherWithGuard = this.subTreeCache.registerWatcherWithGuard(new Watcher() { // from class: org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager.3
                    @Override // org.apache.pulsar.shade.org.apache.zookeeper.Watcher
                    public void process(WatchedEvent watchedEvent) {
                        ZkLedgerUnderreplicationManager.LOG.info("Latch countdown due to ZK event: " + watchedEvent);
                        countDownLatch.countDown();
                    }
                });
                th = null;
                try {
                    try {
                        waitIfLedgerReplicationDisabled();
                        ledgerToRereplicateFromHierarchy = getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
                        if (ledgerToRereplicateFromHierarchy != -1) {
                            break;
                        }
                        countDownLatch.await();
                        if (registerWatcherWithGuard != null) {
                            if (0 != 0) {
                                try {
                                    registerWatcherWithGuard.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                registerWatcherWithGuard.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", e);
            } catch (KeeperException e2) {
                throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
            }
        }
        if (registerWatcherWithGuard != null) {
            if (0 != 0) {
                try {
                    registerWatcherWithGuard.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            } else {
                registerWatcherWithGuard.close();
            }
        }
        return ledgerToRereplicateFromHierarchy;
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb replicationEnableCb = new ReplicationEnableCb();
        if (isLedgerReplicationEnabled()) {
            return;
        }
        notifyLedgerReplicationEnabled(replicationEnableCb);
        replicationEnableCb.await();
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void releaseUnderreplicatedLedger(long j) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("releaseLedger(ledgerId={})", Long.valueOf(j));
        }
        try {
            Lock lock = this.heldLocks.get(Long.valueOf(j));
            if (lock != null) {
                this.zkc.delete(lock.getLockZNode(), -1);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", e);
        } catch (KeeperException.NoNodeException e2) {
        } catch (KeeperException e3) {
            LOG.error("Error deleting underreplicated ledger lock", e3);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e3);
        }
        this.heldLocks.remove(Long.valueOf(j));
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager, java.lang.AutoCloseable
    public void close() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("close()");
        }
        try {
            Iterator<Map.Entry<Long, Lock>> it = this.heldLocks.entrySet().iterator();
            while (it.hasNext()) {
                this.zkc.delete(it.next().getValue().getLockZNode(), -1);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", e);
        } catch (KeeperException.NoNodeException e2) {
        } catch (KeeperException e3) {
            LOG.error("Error deleting underreplicated ledger lock", e3);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e3);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void disableLedgerReplication() throws ReplicationException.UnavailableException {
        List<ACL> aCLs = ZkUtils.getACLs(this.conf);
        if (LOG.isDebugEnabled()) {
            LOG.debug("disableLedegerReplication()");
        }
        try {
            this.zkc.create(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE, "".getBytes(StandardCharsets.UTF_8), aCLs, CreateMode.PERSISTENT);
            LOG.info("Auto ledger re-replication is disabled!");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while stopping auto ledger re-replication", e);
        } catch (KeeperException.NodeExistsException e2) {
            LOG.warn("AutoRecovery is already disabled!", e2);
            throw new ReplicationException.UnavailableException("AutoRecovery is already disabled!", e2);
        } catch (KeeperException e3) {
            LOG.error("Exception while stopping auto ledger re-replication", e3);
            throw new ReplicationException.UnavailableException("Exception while stopping auto ledger re-replication", e3);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void enableLedgerReplication() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("enableLedegerReplication()");
        }
        try {
            this.zkc.delete(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE, -1);
            LOG.info("Resuming automatic ledger re-replication");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while resuming auto ledger re-replication", e);
        } catch (KeeperException.NoNodeException e2) {
            LOG.warn("AutoRecovery is already enabled!", e2);
            throw new ReplicationException.UnavailableException("AutoRecovery is already enabled!", e2);
        } catch (KeeperException e3) {
            LOG.error("Exception while resuming ledger replication", e3);
            throw new ReplicationException.UnavailableException("Exception while resuming auto ledger re-replication", e3);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public boolean isLedgerReplicationEnabled() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("isLedgerReplicationEnabled()");
        }
        try {
            return null == this.zkc.exists(new StringBuilder().append(this.basePath).append('/').append(BookKeeperConstants.DISABLE_NODE).toString(), false);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            LOG.error("Error while checking the state of ledger re-replication", e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("notifyLedgerReplicationEnabled()");
        }
        try {
            if (null == this.zkc.exists(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE, new Watcher() { // from class: org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager.4
                @Override // org.apache.pulsar.shade.org.apache.zookeeper.Watcher
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                        ZkLedgerUnderreplicationManager.LOG.info("LedgerReplication is enabled externally through Zookeeper, since DISABLE_NODE ZNode is deleted");
                        genericCallback.operationComplete(0, null);
                    }
                }
            })) {
                LOG.info("LedgerReplication is enabled externally through Zookeeper, since DISABLE_NODE ZNode is deleted");
                genericCallback.operationComplete(0, null);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            LOG.error("Error while checking the state of ledger re-replication", e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    public static boolean isLedgerBeingReplicated(ZooKeeper zooKeeper, String str, long j) throws KeeperException, InterruptedException {
        return zooKeeper.exists(getUrLedgerLockZnode(getUrLockPath(str), j), false) != null;
    }

    public static void acquireUnderreplicatedLedgerLock(ZooKeeper zooKeeper, String str, long j, List<ACL> list) throws KeeperException, InterruptedException {
        ZkUtils.createFullPathOptimistic(zooKeeper, getUrLedgerLockZnode(getUrLockPath(str), j), LOCK_DATA, list, CreateMode.EPHEMERAL);
    }

    public static void releaseUnderreplicatedLedgerLock(ZooKeeper zooKeeper, String str, long j) throws InterruptedException, KeeperException {
        if (isLedgerBeingReplicated(zooKeeper, str, j)) {
            zooKeeper.delete(getUrLedgerLockZnode(getUrLockPath(str), j), -1);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public boolean initializeLostBookieRecoveryDelay(int i) throws ReplicationException.UnavailableException {
        LOG.debug("initializeLostBookieRecoveryDelay()");
        try {
            this.zkc.create(this.lostBookieRecoveryDelayZnode, Integer.toString(i).getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException.NoNodeException e2) {
            LOG.error("lostBookieRecoveryDelay Znode not found. Please verify if Auditor has been initialized.", e2);
            return false;
        } catch (KeeperException.NodeExistsException e3) {
            LOG.info("lostBookieRecoveryDelay Znode is already present, so using existing lostBookieRecoveryDelay Znode value");
            return false;
        } catch (KeeperException e4) {
            LOG.error("Error while initializing LostBookieRecoveryDelay", e4);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e4);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setLostBookieRecoveryDelay(int i) throws ReplicationException.UnavailableException {
        LOG.debug("setLostBookieRecoveryDelay()");
        try {
            if (this.zkc.exists(this.lostBookieRecoveryDelayZnode, false) != null) {
                this.zkc.setData(this.lostBookieRecoveryDelayZnode, Integer.toString(i).getBytes(StandardCharsets.UTF_8), -1);
            } else {
                this.zkc.create(this.lostBookieRecoveryDelayZnode, Integer.toString(i).getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            LOG.error("Error while setting LostBookieRecoveryDelay ", e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException {
        LOG.debug("getLostBookieRecoveryDelay()");
        try {
            return Integer.parseInt(new String(this.zkc.getData(this.lostBookieRecoveryDelayZnode, false, (Stat) null), StandardCharsets.UTF_8));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            LOG.error("Error while getting LostBookieRecoveryDelay ", e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void notifyLostBookieRecoveryDelayChanged(final BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback) throws ReplicationException.UnavailableException {
        LOG.debug("notifyLostBookieRecoveryDelayChanged()");
        try {
            if (null == this.zkc.exists(this.lostBookieRecoveryDelayZnode, new Watcher() { // from class: org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager.5
                @Override // org.apache.pulsar.shade.org.apache.zookeeper.Watcher
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                        genericCallback.operationComplete(0, null);
                    }
                }
            })) {
                genericCallback.operationComplete(0, null);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            LOG.error("Error while checking the state of lostBookieRecoveryDelay", e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public String getReplicationWorkerIdRereplicatingLedger(long j) throws ReplicationException.UnavailableException {
        String str = null;
        try {
            byte[] data = this.zkc.getData(getUrLedgerLockZnode(this.urLockPath, j), false, (Stat) null);
            DataFormats.LockDataFormat.Builder newBuilder = DataFormats.LockDataFormat.newBuilder();
            TextFormat.merge(new String(data, StandardCharsets.UTF_8), newBuilder);
            str = newBuilder.build().getBookieId();
        } catch (InterruptedException e) {
            LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", e);
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (TextFormat.ParseException e2) {
            LOG.error("Error while parsing ZK data of lock", e2);
            throw new ReplicationException.UnavailableException("Error while parsing ZK data of lock", e2);
        } catch (KeeperException.NoNodeException e3) {
        } catch (KeeperException e4) {
            LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", e4);
            throw new ReplicationException.UnavailableException("Error while getting ReplicationWorkerId rereplicating Ledger", e4);
        }
        return str;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setCheckAllLedgersCTime(long j) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setCheckAllLedgersCTime");
        }
        try {
            List<ACL> aCLs = ZkUtils.getACLs(this.conf);
            DataFormats.CheckAllLedgersFormat.Builder newBuilder = DataFormats.CheckAllLedgersFormat.newBuilder();
            newBuilder.setCheckAllLedgersCTime(j);
            byte[] byteArray = newBuilder.build().toByteArray();
            if (this.zkc.exists(this.checkAllLedgersCtimeZnode, false) != null) {
                this.zkc.setData(this.checkAllLedgersCtimeZnode, byteArray, -1);
            } else {
                this.zkc.create(this.checkAllLedgersCtimeZnode, byteArray, aCLs, CreateMode.PERSISTENT);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getCheckAllLedgersCTime() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setCheckAllLedgersCTime");
        }
        try {
            DataFormats.CheckAllLedgersFormat parseFrom = DataFormats.CheckAllLedgersFormat.parseFrom(this.zkc.getData(this.checkAllLedgersCtimeZnode, false, (Stat) null));
            if (parseFrom.hasCheckAllLedgersCTime()) {
                return parseFrom.getCheckAllLedgersCTime();
            }
            return -1L;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (InvalidProtocolBufferException e2) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", e2);
        } catch (KeeperException.NoNodeException e3) {
            LOG.warn("checkAllLedgersCtimeZnode is not yet available");
            return -1L;
        } catch (KeeperException e4) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e4);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setPlacementPolicyCheckCTime(long j) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setPlacementPolicyCheckCTime");
        }
        try {
            List<ACL> aCLs = ZkUtils.getACLs(this.conf);
            DataFormats.PlacementPolicyCheckFormat.Builder newBuilder = DataFormats.PlacementPolicyCheckFormat.newBuilder();
            newBuilder.setPlacementPolicyCheckCTime(j);
            byte[] byteArray = newBuilder.build().toByteArray();
            if (this.zkc.exists(this.placementPolicyCheckCtimeZnode, false) != null) {
                this.zkc.setData(this.placementPolicyCheckCtimeZnode, byteArray, -1);
            } else {
                this.zkc.create(this.placementPolicyCheckCtimeZnode, byteArray, aCLs, CreateMode.PERSISTENT);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getPlacementPolicyCheckCTime() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getPlacementPolicyCheckCTime");
        }
        try {
            DataFormats.PlacementPolicyCheckFormat parseFrom = DataFormats.PlacementPolicyCheckFormat.parseFrom(this.zkc.getData(this.placementPolicyCheckCtimeZnode, false, (Stat) null));
            if (parseFrom.hasPlacementPolicyCheckCTime()) {
                return parseFrom.getPlacementPolicyCheckCTime();
            }
            return -1L;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (InvalidProtocolBufferException e2) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", e2);
        } catch (KeeperException.NoNodeException e3) {
            LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
            return -1L;
        } catch (KeeperException e4) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e4);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setReplicasCheckCTime(long j) throws ReplicationException.UnavailableException {
        try {
            List<ACL> aCLs = ZkUtils.getACLs(this.conf);
            DataFormats.ReplicasCheckFormat.Builder newBuilder = DataFormats.ReplicasCheckFormat.newBuilder();
            newBuilder.setReplicasCheckCTime(j);
            byte[] byteArray = newBuilder.build().toByteArray();
            if (this.zkc.exists(this.replicasCheckCtimeZnode, false) != null) {
                this.zkc.setData(this.replicasCheckCtimeZnode, byteArray, -1);
            } else {
                this.zkc.create(this.replicasCheckCtimeZnode, byteArray, aCLs, CreateMode.PERSISTENT);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("setReplicasCheckCTime completed successfully");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (KeeperException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getReplicasCheckCTime() throws ReplicationException.UnavailableException {
        try {
            DataFormats.ReplicasCheckFormat parseFrom = DataFormats.ReplicasCheckFormat.parseFrom(this.zkc.getData(this.replicasCheckCtimeZnode, false, (Stat) null));
            if (LOG.isDebugEnabled()) {
                LOG.debug("getReplicasCheckCTime completed successfully");
            }
            if (parseFrom.hasReplicasCheckCTime()) {
                return parseFrom.getReplicasCheckCTime();
            }
            return -1L;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (InvalidProtocolBufferException e2) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", e2);
        } catch (KeeperException.NoNodeException e3) {
            LOG.warn("replicasCheckCtimeZnode is not yet available");
            return -1L;
        } catch (KeeperException e4) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e4);
        }
    }
}
