/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Stopwatch;
import org.apache.pulsar.shade.com.google.common.collect.HashMultiset;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Multiset;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.com.google.common.util.concurrent.SettableFuture;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerChecker;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerFragment;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationException;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Gauge;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.shade.org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.shade.org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsDoc(name="auditor", help="Auditor related stats")
public class Auditor
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
    private static final int MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS = 100;
    private static final int REPLICAS_CHECK_TIMEOUT_IN_SECS = 120;
    private static final BitSet EMPTY_BITSET = new BitSet();
    private final ServerConfiguration conf;
    private final BookKeeper bkc;
    private final boolean ownBkc;
    private final BookKeeperAdmin admin;
    private final boolean ownAdmin;
    private BookieLedgerIndexer bookieLedgerIndexer;
    private LedgerManager ledgerManager;
    private LedgerUnderreplicationManager ledgerUnderreplicationManager;
    private final ScheduledExecutorService executor;
    private final ExecutorService ledgerCheckerExecutor;
    private List<String> knownBookies = new ArrayList<String>();
    private final String bookieIdentifier;
    private volatile Future<?> auditTask;
    private Set<String> bookiesToBeAudited = Sets.newHashSet();
    private volatile int lostBookieRecoveryDelayBeforeChange;
    private final AtomicInteger ledgersNotAdheringToPlacementPolicyGuageValue;
    private final AtomicInteger numOfLedgersFoundNotAdheringInPlacementPolicyCheck;
    private final AtomicInteger ledgersSoftlyAdheringToPlacementPolicyGuageValue;
    private final AtomicInteger numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck;
    private final AtomicInteger numOfClosedLedgersAuditedInPlacementPolicyCheck;
    private final AtomicInteger numOfURLedgersElapsedRecoveryGracePeriodGuageValue;
    private final AtomicInteger numOfURLedgersElapsedRecoveryGracePeriod;
    private final AtomicInteger numLedgersHavingNoReplicaOfAnEntryGuageValue;
    private final AtomicInteger numLedgersFoundHavingNoReplicaOfAnEntry;
    private final AtomicInteger numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue;
    private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry;
    private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue;
    private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
    private final long underreplicatedLedgerRecoveryGracePeriod;
    private final int zkOpTimeoutMs;
    private final Semaphore openLedgerNoRecoverySemaphore;
    private final int openLedgerNoRecoverySemaphoreWaitTimeoutMSec;
    private final StatsLogger statsLogger;
    @StatsDoc(name="NUM_UNDER_REPLICATED_LEDGERS", help="the distribution of num under_replicated ledgers on each auditor run")
    private final OpStatsLogger numUnderReplicatedLedger;
    @StatsDoc(name="UNDER_REPLICATED_LEDGERS_TOTAL_SIZE", help="the distribution of under_replicated ledgers total size on each auditor run")
    private final OpStatsLogger underReplicatedLedgerTotalSize;
    @StatsDoc(name="URL_PUBLISH_TIME_FOR_LOST_BOOKIE", help="the latency distribution of publishing under replicated ledgers for lost bookies")
    private final OpStatsLogger uRLPublishTimeForLostBookies;
    @StatsDoc(name="BOOKIE_TO_LEDGERS_MAP_CREATION_TIME", help="the latency distribution of creating bookies-to-ledgers map")
    private final OpStatsLogger bookieToLedgersMapCreationTime;
    @StatsDoc(name="CHECK_ALL_LEDGERS_TIME", help="the latency distribution of checking all ledgers")
    private final OpStatsLogger checkAllLedgersTime;
    @StatsDoc(name="PLACEMENT_POLICY_CHECK_TIME", help="the latency distribution of placementPolicy check")
    private final OpStatsLogger placementPolicyCheckTime;
    @StatsDoc(name="REPLICAS_CHECK_TIME", help="the latency distribution of replicas check")
    private final OpStatsLogger replicasCheckTime;
    @StatsDoc(name="AUDIT_BOOKIES_TIME", help="the latency distribution of auditing all the bookies")
    private final OpStatsLogger auditBookiesTime;
    @StatsDoc(name="NUM_LEDGERS_CHECKED", help="the number of ledgers checked by the auditor")
    private final Counter numLedgersChecked;
    @StatsDoc(name="NUM_FRAGMENTS_PER_LEDGER", help="the distribution of number of fragments per ledger")
    private final OpStatsLogger numFragmentsPerLedger;
    @StatsDoc(name="NUM_BOOKIES_PER_LEDGER", help="the distribution of number of bookies per ledger")
    private final OpStatsLogger numBookiesPerLedger;
    @StatsDoc(name="NUM_BOOKIE_AUDITS_DELAYED", help="the number of bookie-audits delayed")
    private final Counter numBookieAuditsDelayed;
    @StatsDoc(name="NUM_DELAYED_BOOKIE_AUDITS_CANCELLED", help="the number of delayed-bookie-audits cancelled")
    private final Counter numDelayedBookieAuditsCancelled;
    @StatsDoc(name="NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY", help="Gauge for number of ledgers not adhering to placement policy found in placement policy check")
    private final Gauge<Integer> numLedgersNotAdheringToPlacementPolicy;
    @StatsDoc(name="NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY", help="Gauge for number of ledgers softly adhering to placement policy found in placement policy check")
    private final Gauge<Integer> numLedgersSoftlyAdheringToPlacementPolicy;
    @StatsDoc(name="NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD", help="Gauge for number of underreplicated ledgers elapsed recovery grace period")
    private final Gauge<Integer> numUnderreplicatedLedgersElapsedRecoveryGracePeriod;
    @StatsDoc(name="NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY", help="Gauge for number of ledgers having an entry with all the replicas missing")
    private final Gauge<Integer> numLedgersHavingNoReplicaOfAnEntry;
    @StatsDoc(name="NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY", help="Gauge for number of ledgers having an entry with less than AQ number of replicas, this doesn't include ledgers counted towards numLedgersHavingNoReplicaOfAnEntry")
    private final Gauge<Integer> numLedgersHavingLessThanAQReplicasOfAnEntry;
    @StatsDoc(name="NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY", help="Gauge for number of ledgers having an entry with less than WQ number of replicas, this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry")
    private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
    private final Runnable bookieCheck = new Runnable(){

        @Override
        public void run() {
            if (Auditor.this.auditTask == null) {
                Auditor.this.startAudit(true);
            } else {
                LOG.info("Audit already scheduled; skipping periodic bookie check");
            }
        }
    };

    static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
        return Auditor.createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
    }

    static BookKeeper createBookKeeperClient(ServerConfiguration conf, StatsLogger statsLogger) throws InterruptedException, IOException {
        ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
        clientConfiguration.setClientRole("system");
        try {
            return BookKeeper.forConfig(clientConfiguration).statsLogger(statsLogger).build();
        }
        catch (BKException e) {
            throw new IOException("Failed to create bookkeeper client", e);
        }
    }

    static BookKeeper createBookKeeperClientThrowUnavailableException(ServerConfiguration conf) throws ReplicationException.UnavailableException {
        try {
            return Auditor.createBookKeeperClient(conf);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Failed to create bookkeeper client", e);
        }
        catch (IOException e) {
            throw new ReplicationException.UnavailableException("Failed to create bookkeeper client", e);
        }
    }

    public Auditor(String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this(bookieIdentifier, conf, Auditor.createBookKeeperClientThrowUnavailableException(conf), true, statsLogger);
    }

    public Auditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this(bookieIdentifier, conf, bkc, ownBkc, new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf)), true, statsLogger);
    }

    public Auditor(final String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, BookKeeperAdmin admin, boolean ownAdmin, StatsLogger statsLogger) throws ReplicationException.UnavailableException {
        this.conf = conf;
        this.underreplicatedLedgerRecoveryGracePeriod = conf.getUnderreplicatedLedgerRecoveryGracePeriod();
        this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
        this.bookieIdentifier = bookieIdentifier;
        this.statsLogger = statsLogger;
        this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new AtomicInteger(0);
        this.ledgersNotAdheringToPlacementPolicyGuageValue = new AtomicInteger(0);
        this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new AtomicInteger(0);
        this.ledgersSoftlyAdheringToPlacementPolicyGuageValue = new AtomicInteger(0);
        this.numOfClosedLedgersAuditedInPlacementPolicyCheck = new AtomicInteger(0);
        this.numOfURLedgersElapsedRecoveryGracePeriod = new AtomicInteger(0);
        this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue = new AtomicInteger(0);
        this.numLedgersHavingNoReplicaOfAnEntryGuageValue = new AtomicInteger(0);
        this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
        this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue = new AtomicInteger(0);
        this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new AtomicInteger(0);
        this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new AtomicInteger(0);
        this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0);
        if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
            LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0");
            throw new ReplicationException.UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0");
        }
        this.openLedgerNoRecoverySemaphore = new Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
        if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() < 0) {
            LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec should be greater than or equal to 0");
            throw new ReplicationException.UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec should be greater than or equal to 0");
        }
        this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec = conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
        this.numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger("NUM_UNDER_REPLICATED_LEDGERS");
        this.underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger("UNDER_REPLICATED_LEDGERS_TOTAL_SIZE");
        this.uRLPublishTimeForLostBookies = this.statsLogger.getOpStatsLogger("URL_PUBLISH_TIME_FOR_LOST_BOOKIE");
        this.bookieToLedgersMapCreationTime = this.statsLogger.getOpStatsLogger("BOOKIE_TO_LEDGERS_MAP_CREATION_TIME");
        this.checkAllLedgersTime = this.statsLogger.getOpStatsLogger("CHECK_ALL_LEDGERS_TIME");
        this.placementPolicyCheckTime = this.statsLogger.getOpStatsLogger("PLACEMENT_POLICY_CHECK_TIME");
        this.replicasCheckTime = this.statsLogger.getOpStatsLogger("REPLICAS_CHECK_TIME");
        this.auditBookiesTime = this.statsLogger.getOpStatsLogger("AUDIT_BOOKIES_TIME");
        this.numLedgersChecked = this.statsLogger.getCounter("NUM_LEDGERS_CHECKED");
        this.numFragmentsPerLedger = statsLogger.getOpStatsLogger("NUM_FRAGMENTS_PER_LEDGER");
        this.numBookiesPerLedger = statsLogger.getOpStatsLogger("NUM_BOOKIES_PER_LEDGER");
        this.numBookieAuditsDelayed = this.statsLogger.getCounter("NUM_BOOKIE_AUDITS_DELAYED");
        this.numDelayedBookieAuditsCancelled = this.statsLogger.getCounter("NUM_DELAYED_BOOKIE_AUDITS_CANCELLED");
        this.numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return Auditor.this.ledgersNotAdheringToPlacementPolicyGuageValue.get();
            }
        };
        this.statsLogger.registerGauge("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY", this.numLedgersNotAdheringToPlacementPolicy);
        this.numLedgersSoftlyAdheringToPlacementPolicy = new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return Auditor.this.ledgersSoftlyAdheringToPlacementPolicyGuageValue.get();
            }
        };
        this.statsLogger.registerGauge("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY", this.numLedgersSoftlyAdheringToPlacementPolicy);
        this.numUnderreplicatedLedgersElapsedRecoveryGracePeriod = new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return Auditor.this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue.get();
            }
        };
        this.statsLogger.registerGauge("NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD", this.numUnderreplicatedLedgersElapsedRecoveryGracePeriod);
        this.numLedgersHavingNoReplicaOfAnEntry = new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return Auditor.this.numLedgersHavingNoReplicaOfAnEntryGuageValue.get();
            }
        };
        this.statsLogger.registerGauge("NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY", this.numLedgersHavingNoReplicaOfAnEntry);
        this.numLedgersHavingLessThanAQReplicasOfAnEntry = new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return Auditor.this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.get();
            }
        };
        this.statsLogger.registerGauge("NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY", this.numLedgersHavingLessThanAQReplicasOfAnEntry);
        this.numLedgersHavingLessThanWQReplicasOfAnEntry = new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return Auditor.this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.get();
            }
        };
        this.statsLogger.registerGauge("NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY", this.numLedgersHavingLessThanWQReplicasOfAnEntry);
        this.bkc = bkc;
        this.ownBkc = ownBkc;
        this.admin = admin;
        this.ownAdmin = ownAdmin;
        this.initialize(conf, bkc);
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
                t.setDaemon(true);
                return t;
            }
        });
        this.ledgerCheckerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AuditorBookie-LedgerChecker-" + bookieIdentifier);
                t.setDaemon(true);
                return t;
            }
        });
    }

    private void initialize(ServerConfiguration conf, BookKeeper bkc) throws ReplicationException.UnavailableException {
        try {
            LedgerManagerFactory ledgerManagerFactory = bkc.getLedgerManagerFactory();
            this.ledgerManager = ledgerManagerFactory.newLedgerManager();
            this.bookieLedgerIndexer = new BookieLedgerIndexer(this.ledgerManager);
            this.ledgerUnderreplicationManager = ledgerManagerFactory.newLedgerUnderreplicationManager();
            LOG.info("AuthProvider used by the Auditor is {}", (Object)this.admin.getConf().getClientAuthProviderFactoryClass());
            if (this.ledgerUnderreplicationManager.initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
                LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}", (Object)conf.getLostBookieRecoveryDelay());
            } else {
                LOG.info("Valid lostBookieRecoveryDelay zNode is available, so not creating lostBookieRecoveryDelay zNode as part of Auditor initialization ");
            }
            this.lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
        }
        catch (ReplicationException.CompatibilityException ce) {
            throw new ReplicationException.UnavailableException("CompatibilityException while initializing Auditor", ce);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while initializing Auditor", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitShutdownTask() {
        Auditor auditor = this;
        synchronized (auditor) {
            LOG.info("Executing submitShutdownTask");
            if (this.executor.isShutdown()) {
                LOG.info("executor is already shutdown");
                return;
            }
            this.executor.submit(SafeRunnable.safeRun(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Auditor auditor = Auditor.this;
                    synchronized (auditor) {
                        LOG.info("Shutting down Auditor's Executor");
                        Auditor.this.executor.shutdown();
                    }
                }
            }));
        }
    }

    @VisibleForTesting
    synchronized Future<?> submitAuditTask() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException(new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(SafeRunnable.safeRun(new Runnable(){

            @Override
            public void run() {
                try {
                    Auditor.this.waitIfLedgerReplicationDisabled();
                    int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
                    List availableBookies = Auditor.this.getAvailableBookies();
                    Collection newBookies = CollectionUtils.subtract(availableBookies, Auditor.this.knownBookies);
                    Auditor.this.knownBookies.addAll(newBookies);
                    if (!Auditor.this.bookiesToBeAudited.isEmpty() && Auditor.this.knownBookies.containsAll(Auditor.this.bookiesToBeAudited)) {
                        if (Auditor.this.auditTask != null && Auditor.this.auditTask.cancel(false)) {
                            Auditor.this.auditTask = null;
                            Auditor.this.numDelayedBookieAuditsCancelled.inc();
                        }
                        Auditor.this.bookiesToBeAudited.clear();
                    }
                    Auditor.this.bookiesToBeAudited.addAll(CollectionUtils.subtract(Auditor.this.knownBookies, availableBookies));
                    if (Auditor.this.bookiesToBeAudited.size() == 0) {
                        return;
                    }
                    Auditor.this.knownBookies.removeAll(Auditor.this.bookiesToBeAudited);
                    if (lostBookieRecoveryDelay == 0) {
                        Auditor.this.startAudit(false);
                        Auditor.this.bookiesToBeAudited.clear();
                        return;
                    }
                    if (Auditor.this.bookiesToBeAudited.size() > 1) {
                        LOG.info("Multiple bookie failure; not delaying bookie audit. Bookies lost now: {}; All lost bookies: {}", CollectionUtils.subtract(Auditor.this.knownBookies, availableBookies), (Object)Auditor.this.bookiesToBeAudited);
                        if (Auditor.this.auditTask != null && Auditor.this.auditTask.cancel(false)) {
                            Auditor.this.auditTask = null;
                            Auditor.this.numDelayedBookieAuditsCancelled.inc();
                        }
                        Auditor.this.startAudit(false);
                        Auditor.this.bookiesToBeAudited.clear();
                        return;
                    }
                    if (Auditor.this.auditTask == null) {
                        Auditor.this.auditTask = Auditor.this.executor.schedule(SafeRunnable.safeRun(new Runnable(){

                            @Override
                            public void run() {
                                Auditor.this.startAudit(false);
                                Auditor.this.auditTask = null;
                                Auditor.this.bookiesToBeAudited.clear();
                            }
                        }), (long)lostBookieRecoveryDelay, TimeUnit.SECONDS);
                        Auditor.this.numBookieAuditsDelayed.inc();
                        LOG.info("Delaying bookie audit by {} secs for {}", (Object)lostBookieRecoveryDelay, (Object)Auditor.this.bookiesToBeAudited);
                    }
                }
                catch (BKException bke) {
                    LOG.error("Exception getting bookie list", (Throwable)bke);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Exception while watching available bookies", (Throwable)ue);
                }
            }
        }));
    }

    synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
        if (this.executor.isShutdown()) {
            SettableFuture f = SettableFuture.create();
            f.setException(new ReplicationException.BKAuditException("Auditor shutting down"));
            return f;
        }
        return this.executor.submit(SafeRunnable.safeRun(new Runnable(){
            int lostBookieRecoveryDelay = -1;

            @Override
            public void run() {
                try {
                    Auditor.this.waitIfLedgerReplicationDisabled();
                    this.lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
                    if (Auditor.this.auditTask != null) {
                        LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask");
                        Auditor.this.auditTask.cancel(false);
                        Auditor.this.numDelayedBookieAuditsCancelled.inc();
                    }
                    if (this.lostBookieRecoveryDelay == 0 || this.lostBookieRecoveryDelay == Auditor.this.lostBookieRecoveryDelayBeforeChange) {
                        LOG.info("lostBookieRecoveryDelay has been set to 0 or reset to its previous value, so starting AuditTask. Current lostBookieRecoveryDelay: {}, previous lostBookieRecoveryDelay: {}", (Object)this.lostBookieRecoveryDelay, (Object)Auditor.this.lostBookieRecoveryDelayBeforeChange);
                        Auditor.this.startAudit(false);
                        Auditor.this.auditTask = null;
                        Auditor.this.bookiesToBeAudited.clear();
                    } else if (Auditor.this.auditTask != null) {
                        LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly", (Object)this.lostBookieRecoveryDelay);
                        Auditor.this.auditTask = Auditor.this.executor.schedule(SafeRunnable.safeRun(new Runnable(){

                            @Override
                            public void run() {
                                Auditor.this.startAudit(false);
                                Auditor.this.auditTask = null;
                                Auditor.this.bookiesToBeAudited.clear();
                            }
                        }), (long)this.lostBookieRecoveryDelay, TimeUnit.SECONDS);
                        Auditor.this.numBookieAuditsDelayed.inc();
                    }
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Interrupted while for LedgersReplication to be enabled ", (Throwable)ie);
                }
                catch (ReplicationException.NonRecoverableReplicationException nre) {
                    LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                    Auditor.this.submitShutdownTask();
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Exception while reading from ZK", (Throwable)ue);
                }
                finally {
                    if (this.lostBookieRecoveryDelay != -1) {
                        Auditor.this.lostBookieRecoveryDelayBeforeChange = this.lostBookieRecoveryDelay;
                    }
                }
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        LOG.info("I'm starting as Auditor Bookie. ID: {}", (Object)this.bookieIdentifier);
        Auditor auditor = this;
        synchronized (auditor) {
            if (this.executor.isShutdown()) {
                return;
            }
            try {
                this.watchBookieChanges();
                this.knownBookies = this.getAvailableBookies();
            }
            catch (BKException bke) {
                LOG.error("Couldn't get bookie list, so exiting", (Throwable)bke);
                this.submitShutdownTask();
            }
            try {
                this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Exception while registering for LostBookieRecoveryDelay change notification, so exiting", (Throwable)ue);
                this.submitShutdownTask();
            }
            this.scheduleBookieCheckTask();
            this.scheduleCheckAllLedgersTask();
            this.schedulePlacementPolicyCheckTask();
            this.scheduleReplicasCheckTask();
        }
    }

    private void scheduleBookieCheckTask() {
        long bookieCheckInterval = this.conf.getAuditorPeriodicBookieCheckInterval();
        if (bookieCheckInterval == 0L) {
            LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
            this.executor.submit(SafeRunnable.safeRun(this.bookieCheck));
        } else {
            LOG.info("Auditor periodic bookie checking enabled 'auditorPeriodicBookieCheckInterval' {} seconds", (Object)bookieCheckInterval);
            this.executor.scheduleAtFixedRate(SafeRunnable.safeRun(this.bookieCheck), 0L, bookieCheckInterval, TimeUnit.SECONDS);
        }
    }

    private void scheduleCheckAllLedgersTask() {
        long interval = this.conf.getAuditorPeriodicCheckInterval();
        if (interval > 0L) {
            long initialDelay;
            long durationSinceLastExecutionInSecs;
            long checkAllLedgersLastExecutedCTime;
            LOG.info("Auditor periodic ledger checking enabled 'auditorPeriodicCheckInterval' {} seconds", (Object)interval);
            try {
                checkAllLedgersLastExecutedCTime = this.ledgerUnderreplicationManager.getCheckAllLedgersCTime();
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
                return;
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", (Throwable)ue);
                checkAllLedgersLastExecutedCTime = -1L;
            }
            if (checkAllLedgersLastExecutedCTime == -1L) {
                durationSinceLastExecutionInSecs = -1L;
                initialDelay = 0L;
            } else {
                durationSinceLastExecutionInSecs = (System.currentTimeMillis() - checkAllLedgersLastExecutedCTime) / 1000L;
                if (durationSinceLastExecutionInSecs < 0L) {
                    durationSinceLastExecutionInSecs = 0L;
                }
                initialDelay = durationSinceLastExecutionInSecs > interval ? 0L : interval - durationSinceLastExecutionInSecs;
            }
            LOG.info("checkAllLedgers scheduling info.  checkAllLedgersLastExecutedCTime: {} durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", new Object[]{checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval});
            this.executor.scheduleAtFixedRate(SafeRunnable.safeRun(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    boolean checkSuccess = false;
                    try {
                        if (!Auditor.this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                            LOG.info("Ledger replication disabled, skipping checkAllLedgers");
                            return;
                        }
                        LOG.info("Starting checkAllLedgers");
                        Auditor.this.checkAllLedgers();
                        long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                        LOG.info("Completed checkAllLedgers in {} milliSeconds", (Object)checkAllLedgersDuration);
                        Auditor.this.checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
                        checkSuccess = true;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        LOG.error("Interrupted while running periodic check", (Throwable)ie);
                    }
                    catch (BKException bke) {
                        LOG.error("Exception running periodic check", (Throwable)bke);
                    }
                    catch (IOException ioe) {
                        LOG.error("I/O exception running periodic check", (Throwable)ioe);
                    }
                    catch (ReplicationException.NonRecoverableReplicationException nre) {
                        LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                        Auditor.this.submitShutdownTask();
                    }
                    catch (ReplicationException.UnavailableException ue) {
                        LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                    }
                    finally {
                        if (!checkSuccess) {
                            long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                            Auditor.this.checkAllLedgersTime.registerFailedEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
                        }
                    }
                }
            }), initialDelay, interval, TimeUnit.SECONDS);
        } else {
            LOG.info("Periodic checking disabled");
        }
    }

    private void schedulePlacementPolicyCheckTask() {
        long interval = this.conf.getAuditorPeriodicPlacementPolicyCheckInterval();
        if (interval > 0L) {
            long initialDelay;
            long durationSinceLastExecutionInSecs;
            long placementPolicyCheckLastExecutedCTime;
            LOG.info("Auditor periodic placement policy check enabled 'auditorPeriodicPlacementPolicyCheckInterval' {} seconds", (Object)interval);
            try {
                placementPolicyCheckLastExecutedCTime = this.ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
                return;
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", (Throwable)ue);
                placementPolicyCheckLastExecutedCTime = -1L;
            }
            if (placementPolicyCheckLastExecutedCTime == -1L) {
                durationSinceLastExecutionInSecs = -1L;
                initialDelay = 0L;
            } else {
                durationSinceLastExecutionInSecs = (System.currentTimeMillis() - placementPolicyCheckLastExecutedCTime) / 1000L;
                if (durationSinceLastExecutionInSecs < 0L) {
                    durationSinceLastExecutionInSecs = 0L;
                }
                initialDelay = durationSinceLastExecutionInSecs > interval ? 0L : interval - durationSinceLastExecutionInSecs;
            }
            LOG.info("placementPolicyCheck scheduling info.  placementPolicyCheckLastExecutedCTime: {} durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", new Object[]{placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval});
            this.executor.scheduleAtFixedRate(SafeRunnable.safeRun(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (!Auditor.this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                            LOG.info("Ledger replication disabled, skipping placementPolicyCheck");
                            return;
                        }
                        Stopwatch stopwatch = Stopwatch.createStarted();
                        LOG.info("Starting PlacementPolicyCheck");
                        Auditor.this.placementPolicyCheck();
                        long placementPolicyCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                        int numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue = Auditor.this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck.get();
                        int numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue = Auditor.this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.get();
                        int numOfClosedLedgersAuditedInPlacementPolicyCheckValue = Auditor.this.numOfClosedLedgersAuditedInPlacementPolicyCheck.get();
                        int numOfURLedgersElapsedRecoveryGracePeriodValue = Auditor.this.numOfURLedgersElapsedRecoveryGracePeriod.get();
                        LOG.info("Completed placementPolicyCheck in {} milliSeconds. numOfClosedLedgersAuditedInPlacementPolicyCheck {} numOfLedgersNotAdheringToPlacementPolicy {} numOfLedgersSoftlyAdheringToPlacementPolicy {} numOfURLedgersElapsedRecoveryGracePeriod {}", new Object[]{placementPolicyCheckDuration, numOfClosedLedgersAuditedInPlacementPolicyCheckValue, numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue, numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue, numOfURLedgersElapsedRecoveryGracePeriodValue});
                        Auditor.this.ledgersNotAdheringToPlacementPolicyGuageValue.set(numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue);
                        Auditor.this.ledgersSoftlyAdheringToPlacementPolicyGuageValue.set(numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
                        Auditor.this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue.set(numOfURLedgersElapsedRecoveryGracePeriodValue);
                        Auditor.this.placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration, TimeUnit.MILLISECONDS);
                    }
                    catch (ReplicationException.BKAuditException e) {
                        int numOfURLedgersElapsedRecoveryGracePeriodValue;
                        int numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue;
                        int numOfLedgersFoundInPlacementPolicyCheckValue = Auditor.this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck.get();
                        if (numOfLedgersFoundInPlacementPolicyCheckValue > 0) {
                            Auditor.this.ledgersNotAdheringToPlacementPolicyGuageValue.set(numOfLedgersFoundInPlacementPolicyCheckValue);
                        }
                        if ((numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue = Auditor.this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.get()) > 0) {
                            Auditor.this.ledgersSoftlyAdheringToPlacementPolicyGuageValue.set(numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
                        }
                        if ((numOfURLedgersElapsedRecoveryGracePeriodValue = Auditor.this.numOfURLedgersElapsedRecoveryGracePeriod.get()) > 0) {
                            Auditor.this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue.set(numOfURLedgersElapsedRecoveryGracePeriodValue);
                        }
                        LOG.error("BKAuditException running periodic placementPolicy check.numOfLedgersNotAdheringToPlacementPolicy {}, numOfLedgersSoftlyAdheringToPlacementPolicy {},numOfURLedgersElapsedRecoveryGracePeriod {}", new Object[]{numOfLedgersFoundInPlacementPolicyCheckValue, numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue, numOfURLedgersElapsedRecoveryGracePeriodValue, e});
                    }
                    catch (ReplicationException.UnavailableException ue) {
                        LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                    }
                }
            }), initialDelay, interval, TimeUnit.SECONDS);
        } else {
            LOG.info("Periodic placementPolicy check disabled");
        }
    }

    private void scheduleReplicasCheckTask() {
        long initialDelay;
        long durationSinceLastExecutionInSecs;
        long replicasCheckLastExecutedCTime;
        long interval = this.conf.getAuditorPeriodicReplicasCheckInterval();
        if (interval <= 0L) {
            LOG.info("Periodic replicas check disabled");
            return;
        }
        LOG.info("Auditor periodic replicas check enabled 'auditorReplicasCheckInterval' {} seconds", (Object)interval);
        try {
            replicasCheckLastExecutedCTime = this.ledgerUnderreplicationManager.getReplicasCheckCTime();
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
            return;
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Got UnavailableException while trying to get replicasCheckCTime", (Throwable)ue);
            replicasCheckLastExecutedCTime = -1L;
        }
        if (replicasCheckLastExecutedCTime == -1L) {
            durationSinceLastExecutionInSecs = -1L;
            initialDelay = 0L;
        } else {
            durationSinceLastExecutionInSecs = (System.currentTimeMillis() - replicasCheckLastExecutedCTime) / 1000L;
            if (durationSinceLastExecutionInSecs < 0L) {
                durationSinceLastExecutionInSecs = 0L;
            }
            initialDelay = durationSinceLastExecutionInSecs > interval ? 0L : interval - durationSinceLastExecutionInSecs;
        }
        LOG.info("replicasCheck scheduling info. replicasCheckLastExecutedCTime: {} durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", new Object[]{replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval});
        this.executor.scheduleAtFixedRate(SafeRunnable.safeRun(new Runnable(){

            @Override
            public void run() {
                try {
                    if (!Auditor.this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                        LOG.info("Ledger replication disabled, skipping replicasCheck task.");
                        return;
                    }
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    LOG.info("Starting ReplicasCheck");
                    Auditor.this.replicasCheck();
                    long replicasCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                    int numLedgersFoundHavingNoReplicaOfAnEntryValue = Auditor.this.numLedgersFoundHavingNoReplicaOfAnEntry.get();
                    int numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue = Auditor.this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry.get();
                    int numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue = Auditor.this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry.get();
                    LOG.info("Completed ReplicasCheck in {} milliSeconds numLedgersFoundHavingNoReplicaOfAnEntry {} numLedgersFoundHavingLessThanAQReplicasOfAnEntry {} numLedgersFoundHavingLessThanWQReplicasOfAnEntry {}.", new Object[]{replicasCheckDuration, numLedgersFoundHavingNoReplicaOfAnEntryValue, numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue, numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue});
                    Auditor.this.numLedgersHavingNoReplicaOfAnEntryGuageValue.set(numLedgersFoundHavingNoReplicaOfAnEntryValue);
                    Auditor.this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.set(numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue);
                    Auditor.this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
                    Auditor.this.replicasCheckTime.registerSuccessfulEvent(replicasCheckDuration, TimeUnit.MILLISECONDS);
                }
                catch (ReplicationException.BKAuditException e) {
                    int numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue;
                    int numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue;
                    LOG.error("BKAuditException running periodic replicas check.", (Throwable)e);
                    int numLedgersFoundHavingNoReplicaOfAnEntryValue = Auditor.this.numLedgersFoundHavingNoReplicaOfAnEntry.get();
                    if (numLedgersFoundHavingNoReplicaOfAnEntryValue > 0) {
                        Auditor.this.numLedgersHavingNoReplicaOfAnEntryGuageValue.set(numLedgersFoundHavingNoReplicaOfAnEntryValue);
                    }
                    if ((numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue = Auditor.this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry.get()) > 0) {
                        Auditor.this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.set(numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue);
                    }
                    if ((numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue = Auditor.this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry.get()) > 0) {
                        Auditor.this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
                    }
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                }
            }
        }), initialDelay, interval, TimeUnit.SECONDS);
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
            LOG.info("LedgerReplication is disabled externally through Zookeeper, since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
            this.ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    private List<String> getAvailableBookies() throws BKException {
        Collection<BookieId> availableBkAddresses = this.admin.getAvailableBookies();
        Collection<BookieId> readOnlyBkAddresses = this.admin.getReadOnlyBookies();
        availableBkAddresses.addAll(readOnlyBkAddresses);
        ArrayList<String> availableBookies = new ArrayList<String>();
        for (BookieId addr : availableBkAddresses) {
            availableBookies.add(addr.toString());
        }
        return availableBookies;
    }

    private void watchBookieChanges() throws BKException {
        this.admin.watchWritableBookiesChanged(bookies -> this.submitAuditTask());
        this.admin.watchReadOnlyBookiesChanged(bookies -> this.submitAuditTask());
    }

    private void startAudit(boolean shutDownTask) {
        try {
            this.auditBookies();
            shutDownTask = false;
        }
        catch (BKException bke) {
            LOG.error("Exception getting bookie list", (Throwable)bke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
        }
        catch (ReplicationException.BKAuditException bke) {
            LOG.error("Exception while watching available bookies", (Throwable)bke);
        }
        if (shutDownTask) {
            this.submitShutdownTask();
        }
    }

    private void auditBookies() throws ReplicationException.BKAuditException, InterruptedException, BKException {
        try {
            this.waitIfLedgerReplicationDisabled();
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
            return;
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        LOG.info("Starting auditBookies");
        Stopwatch stopwatch = Stopwatch.createStarted();
        Map<String, Set<Long>> ledgerDetails = this.generateBookie2LedgersIndex();
        try {
            if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                this.executor.submit(SafeRunnable.safeRun(this.bookieCheck));
                return;
            }
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        List<String> availableBookies = this.getAvailableBookies();
        Set<String> knownBookies = ledgerDetails.keySet();
        Collection<String> lostBookies = CollectionUtils.subtract(knownBookies, availableBookies);
        this.bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        if (lostBookies.size() > 0) {
            try {
                FutureUtils.result(this.handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
            }
            catch (ReplicationException e) {
                throw new ReplicationException.BKAuditException(e.getMessage(), e.getCause());
            }
            this.uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
        LOG.info("Completed auditBookies");
        this.auditBookiesTime.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    }

    private Map<String, Set<Long>> generateBookie2LedgersIndex() throws ReplicationException.BKAuditException {
        return this.bookieLedgerIndexer.getBookieToLedgerIndex();
    }

    private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies, Map<String, Set<Long>> ledgerDetails) {
        LOG.info("Following are the failed bookies: {}, and searching its ledgers for re-replication", lostBookies);
        return FutureUtils.processList(Lists.newArrayList(lostBookies), bookieIP -> this.publishSuspectedLedgersAsync(Lists.newArrayList(bookieIP), (Set)ledgerDetails.get(bookieIP)), null);
    }

    private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
        if (null == ledgers || ledgers.size() == 0) {
            LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
            return FutureUtils.Void();
        }
        LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
        this.numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
        LongAdder underReplicatedSize = new LongAdder();
        FutureUtils.processList(Lists.newArrayList(ledgers), ledgerId -> this.ledgerManager.readLedgerMetadata((long)ledgerId).whenComplete((metadata, exception) -> {
            if (exception == null) {
                underReplicatedSize.add(((LedgerMetadata)metadata.getValue()).getLength());
            }
        }), null).whenComplete((res, e) -> this.underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue()));
        return FutureUtils.processList(Lists.newArrayList(ledgers), ledgerId -> this.ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync((long)ledgerId, missingBookies), null);
    }

    BookKeeper getBookKeeper(ServerConfiguration conf) throws IOException, InterruptedException {
        return Auditor.createBookKeeperClient(conf);
    }

    BookKeeperAdmin getBookKeeperAdmin(BookKeeper bookKeeper) {
        return new BookKeeperAdmin(bookKeeper, this.statsLogger, new ClientConfiguration(this.conf));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkAllLedgers() throws BKException, IOException, InterruptedException {
        BookKeeper localClient = this.getBookKeeper(this.conf);
        BookKeeperAdmin localAdmin = this.getBookKeeperAdmin(localClient);
        try {
            LedgerChecker checker = new LedgerChecker(localClient, this.conf.getInFlightReadEntryNumInLedgerChecker());
            CompletableFuture processFuture = new CompletableFuture();
            BookkeeperInternalCallbacks.Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
                try {
                    if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                        LOG.info("Ledger rereplication has been disabled, aborting periodic check");
                        FutureUtils.complete(processFuture, null);
                        return;
                    }
                }
                catch (ReplicationException.NonRecoverableReplicationException nre) {
                    LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                    this.submitShutdownTask();
                    return;
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                    FutureUtils.complete(processFuture, null);
                    return;
                }
                try {
                    if (!this.openLedgerNoRecoverySemaphore.tryAcquire(this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec, TimeUnit.MILLISECONDS)) {
                        LOG.warn("Failed to acquire semaphore for {} ms, ledgerId: {}", (Object)this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec, ledgerId);
                        FutureUtils.complete(processFuture, null);
                        return;
                    }
                }
                catch (InterruptedException e) {
                    LOG.error("Unable to acquire open ledger operation semaphore ", (Throwable)e);
                    Thread.currentThread().interrupt();
                    FutureUtils.complete(processFuture, null);
                    return;
                }
                localAdmin.asyncOpenLedgerNoRecovery((long)ledgerId, (rc, lh, ctx) -> {
                    this.openLedgerNoRecoverySemaphore.release();
                    if (0 == rc) {
                        this.ledgerCheckerExecutor.execute(() -> {
                            checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback), this.conf.getAuditorLedgerVerificationPercentage());
                            this.numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
                            this.numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
                            this.numLedgersChecked.inc();
                        });
                    } else if (-25 == rc) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
                        }
                        callback.processResult(0, null, null);
                    } else {
                        LOG.error("Couldn't open ledger {} to check : {}", ledgerId, (Object)BKException.getMessage(rc));
                        callback.processResult(rc, null, null);
                    }
                }, null);
            };
            this.ledgerManager.asyncProcessLedgers(checkLedgersProcessor, (rc, path, ctx) -> {
                if (0 == rc) {
                    FutureUtils.complete(processFuture, null);
                } else {
                    FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
                }
            }, null, 0, -1);
            FutureUtils.result(processFuture, BKException.HANDLER);
            try {
                this.ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Got exception while trying to set checkAllLedgersCTime", (Throwable)ue);
            }
        }
        finally {
            localAdmin.close();
            localClient.close();
        }
    }

    void placementPolicyCheck() throws ReplicationException.BKAuditException {
        final CountDownLatch placementPolicyCheckLatch = new CountDownLatch(1);
        this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck.set(0);
        this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.set(0);
        this.numOfClosedLedgersAuditedInPlacementPolicyCheck.set(0);
        this.numOfURLedgersElapsedRecoveryGracePeriod.set(0);
        if (this.underreplicatedLedgerRecoveryGracePeriod > 0L) {
            Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = this.ledgerUnderreplicationManager.listLedgersToRereplicate(null);
            ArrayList<Long> urLedgersElapsedRecoveryGracePeriod = new ArrayList<Long>();
            while (underreplicatedLedgersInfo.hasNext()) {
                long elapsedTimeInSecs;
                UnderreplicatedLedger underreplicatedLedger = underreplicatedLedgersInfo.next();
                long underreplicatedLedgerMarkTimeInMilSecs = underreplicatedLedger.getCtime();
                if (underreplicatedLedgerMarkTimeInMilSecs == -1L || (elapsedTimeInSecs = (System.currentTimeMillis() - underreplicatedLedgerMarkTimeInMilSecs) / 1000L) <= this.underreplicatedLedgerRecoveryGracePeriod) continue;
                urLedgersElapsedRecoveryGracePeriod.add(underreplicatedLedger.getLedgerId());
                this.numOfURLedgersElapsedRecoveryGracePeriod.incrementAndGet();
            }
            if (urLedgersElapsedRecoveryGracePeriod.isEmpty()) {
                LOG.info("No Underreplicated ledger has elapsed recovery graceperiod: {}", urLedgersElapsedRecoveryGracePeriod);
            } else {
                LOG.error("Following Underreplicated ledgers have elapsed recovery graceperiod: {}", urLedgersElapsedRecoveryGracePeriod);
            }
        }
        BookkeeperInternalCallbacks.Processor<Long> ledgerProcessor = new BookkeeperInternalCallbacks.Processor<Long>(){

            @Override
            public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
                Auditor.this.ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadataVer, exception) -> {
                    if (exception == null) {
                        LedgerMetadata metadata = (LedgerMetadata)metadataVer.getValue();
                        int writeQuorumSize = metadata.getWriteQuorumSize();
                        int ackQuorumSize = metadata.getAckQuorumSize();
                        if (metadata.isClosed()) {
                            boolean foundSegmentNotAdheringToPlacementPolicy = false;
                            boolean foundSegmentSoftlyAdheringToPlacementPolicy = false;
                            for (Map.Entry ensemble : metadata.getAllEnsembles().entrySet()) {
                                long startEntryIdOfSegment = (Long)ensemble.getKey();
                                List ensembleOfSegment = (List)ensemble.getValue();
                                EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = Auditor.this.admin.isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize, ackQuorumSize);
                                if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
                                    foundSegmentNotAdheringToPlacementPolicy = true;
                                    LOG.warn("For ledger: {}, Segment starting at entry: {}, with ensemble: {} having writeQuorumSize: {} and ackQuorumSize: {} is not adhering to EnsemblePlacementPolicy", new Object[]{ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, ackQuorumSize});
                                    continue;
                                }
                                if (segmentAdheringToPlacementPolicy != EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) continue;
                                foundSegmentSoftlyAdheringToPlacementPolicy = true;
                                if (!LOG.isDebugEnabled()) continue;
                                LOG.debug("For ledger: {}, Segment starting at entry: {}, with ensemble: {} having writeQuorumSize: {} and ackQuorumSize: {} is softly adhering to EnsemblePlacementPolicy", new Object[]{ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, ackQuorumSize});
                            }
                            if (foundSegmentNotAdheringToPlacementPolicy) {
                                Auditor.this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
                            } else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
                                Auditor.this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.incrementAndGet();
                            }
                            Auditor.this.numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet();
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicycheck analysis for now", (Object)ledgerId);
                        }
                        iterCallback.processResult(0, null, null);
                    } else if (BKException.getExceptionCode(exception) == -25) {
                        LOG.debug("Ignoring replication of already deleted ledger {}", (Object)ledgerId);
                        iterCallback.processResult(0, null, null);
                    } else {
                        LOG.warn("Unable to read the ledger: {} information", (Object)ledgerId);
                        iterCallback.processResult(BKException.getExceptionCode(exception), null, null);
                    }
                });
            }
        };
        final ArrayList resultCode = new ArrayList(1);
        this.ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback(){

            @Override
            public void processResult(int rc, String s, Object obj) {
                resultCode.add(rc);
                placementPolicyCheckLatch.countDown();
            }
        }, null, 0, -1);
        try {
            placementPolicyCheckLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.BKAuditException("Exception while doing placementPolicy check", e);
        }
        if (!resultCode.contains(0)) {
            throw new ReplicationException.BKAuditException("Exception while doing placementPolicy check", BKException.create((Integer)resultCode.get(0)));
        }
        try {
            this.ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis());
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", (Throwable)ue);
        }
    }

    void replicasCheck() throws ReplicationException.BKAuditException {
        block13: {
            AtomicInteger resultCode;
            int resultCodeIntValue;
            ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries = new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
            ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies = new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
            LedgerManager.LedgerRangeIterator ledgerRangeIterator = this.ledgerManager.getLedgerRanges(this.zkOpTimeoutMs);
            final Semaphore maxConcurrentSemaphore = new Semaphore(100);
            do {
                LedgerManager.LedgerRange ledgerRange = null;
                try {
                    if (!ledgerRangeIterator.hasNext()) break block13;
                    ledgerRange = ledgerRangeIterator.next();
                }
                catch (IOException ioe) {
                    LOG.error("Got IOException while iterating LedgerRangeIterator", (Throwable)ioe);
                    throw new ReplicationException.BKAuditException("Got IOException while iterating LedgerRangeIterator", ioe);
                }
                ledgersWithMissingEntries.clear();
                ledgersWithUnavailableBookies.clear();
                this.numLedgersFoundHavingNoReplicaOfAnEntry.set(0);
                this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry.set(0);
                this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry.set(0);
                Set<Long> ledgersInRange = ledgerRange.getLedgers();
                int numOfLedgersInRange = ledgersInRange.size();
                resultCode = new AtomicInteger();
                CountDownLatch replicasCheckLatch = new CountDownLatch(1);
                ReplicasCheckFinalCallback finalCB = new ReplicasCheckFinalCallback(resultCode, replicasCheckLatch);
                BookkeeperInternalCallbacks.MultiCallback mcbForThisLedgerRange = new BookkeeperInternalCallbacks.MultiCallback(numOfLedgersInRange, finalCB, null, 0, -1){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void processResult(int rc, String path, Object ctx) {
                        try {
                            super.processResult(rc, path, ctx);
                        }
                        finally {
                            maxConcurrentSemaphore.release();
                        }
                    }
                };
                LOG.debug("Number of ledgers in the current LedgerRange : {}", (Object)numOfLedgersInRange);
                for (Long ledgerInRange : ledgersInRange) {
                    try {
                        if (!maxConcurrentSemaphore.tryAcquire(120L, TimeUnit.SECONDS)) {
                            LOG.error("Timedout ({} secs) while waiting for acquiring semaphore", (Object)120);
                            throw new ReplicationException.BKAuditException("Timedout while waiting for acquiring semaphore");
                        }
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        LOG.error("Got InterruptedException while acquiring semaphore for replicascheck", (Throwable)ie);
                        throw new ReplicationException.BKAuditException("Got InterruptedException while acquiring semaphore for replicascheck", ie);
                    }
                    if (this.checkUnderReplicationForReplicasCheck(ledgerInRange, mcbForThisLedgerRange)) continue;
                    this.ledgerManager.readLedgerMetadata(ledgerInRange).whenComplete((BiConsumer)new ReadLedgerMetadataCallbackForReplicasCheck(ledgerInRange, mcbForThisLedgerRange, ledgersWithMissingEntries, ledgersWithUnavailableBookies));
                }
                try {
                    if (!replicasCheckLatch.await(120L, TimeUnit.SECONDS)) {
                        LOG.error("For LedgerRange with num of ledgers : {} it didn't complete replicascheck in {} secs, so giving up", (Object)numOfLedgersInRange, (Object)120);
                        throw new ReplicationException.BKAuditException("Got InterruptedException while doing replicascheck");
                    }
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Got InterruptedException while doing replicascheck", (Throwable)ie);
                    throw new ReplicationException.BKAuditException("Got InterruptedException while doing replicascheck", ie);
                }
                this.reportLedgersWithMissingEntries(ledgersWithMissingEntries);
                this.reportLedgersWithUnavailableBookies(ledgersWithUnavailableBookies);
            } while ((resultCodeIntValue = resultCode.get()) == 0);
            throw new ReplicationException.BKAuditException("Exception while doing replicas check", BKException.create(resultCodeIntValue));
        }
        try {
            this.ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Got exception while trying to set ReplicasCheckCTime", (Throwable)ue);
        }
    }

    private void reportLedgersWithMissingEntries(ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries) {
        StringBuilder errMessage = new StringBuilder();
        HashMultiset missingEntries = HashMultiset.create();
        for (Map.Entry<Long, MissingEntriesInfoOfLedger> missingEntriesInfoOfLedgerEntry : ledgersWithMissingEntries.entrySet()) {
            missingEntries.clear();
            errMessage.setLength(0);
            long ledgerWithMissingEntries = missingEntriesInfoOfLedgerEntry.getKey();
            MissingEntriesInfoOfLedger missingEntriesInfoOfLedger = missingEntriesInfoOfLedgerEntry.getValue();
            List missingEntriesInfoList = missingEntriesInfoOfLedger.getMissingEntriesInfoList();
            int writeQuorumSize = missingEntriesInfoOfLedger.getWriteQuorumSize();
            int ackQuorumSize = missingEntriesInfoOfLedger.getAckQuorumSize();
            errMessage.append("Ledger : " + ledgerWithMissingEntries + " has following missing entries : ");
            for (int listInd = 0; listInd < missingEntriesInfoList.size(); ++listInd) {
                MissingEntriesInfo missingEntriesInfo = (MissingEntriesInfo)missingEntriesInfoList.get(listInd);
                List unavailableEntriesList = missingEntriesInfo.getUnavailableEntriesList();
                Map.Entry segmentEnsemble = missingEntriesInfo.getSegmentEnsemble();
                missingEntries.addAll(unavailableEntriesList);
                errMessage.append("In segment starting at " + segmentEnsemble.getKey() + " with ensemble " + segmentEnsemble.getValue() + ", following entries " + unavailableEntriesList + " are missing in bookie: " + missingEntriesInfo.getBookieMissingEntries());
                if (listInd >= missingEntriesInfoList.size() - 1) continue;
                errMessage.append(", ");
            }
            LOG.error(errMessage.toString());
            Set missingEntriesSet = missingEntries.entrySet();
            int maxNumOfMissingReplicas = 0;
            long entryWithMaxNumOfMissingReplicas = -1L;
            for (Multiset.Entry missingEntryWithCount : missingEntriesSet) {
                if (missingEntryWithCount.getCount() <= maxNumOfMissingReplicas) continue;
                maxNumOfMissingReplicas = missingEntryWithCount.getCount();
                entryWithMaxNumOfMissingReplicas = (Long)missingEntryWithCount.getElement();
            }
            int leastNumOfReplicasOfAnEntry = writeQuorumSize - maxNumOfMissingReplicas;
            if (leastNumOfReplicasOfAnEntry == 0) {
                this.numLedgersFoundHavingNoReplicaOfAnEntry.incrementAndGet();
                LOG.error("Ledger : {} entryId : {} is missing all replicas", (Object)ledgerWithMissingEntries, (Object)entryWithMaxNumOfMissingReplicas);
                continue;
            }
            if (leastNumOfReplicasOfAnEntry < ackQuorumSize) {
                this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry.incrementAndGet();
                LOG.error("Ledger : {} entryId : {} is having: {} replicas, less than ackQuorum num of replicas : {}", new Object[]{ledgerWithMissingEntries, entryWithMaxNumOfMissingReplicas, leastNumOfReplicasOfAnEntry, ackQuorumSize});
                continue;
            }
            if (leastNumOfReplicasOfAnEntry >= writeQuorumSize) continue;
            this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry.incrementAndGet();
            LOG.error("Ledger : {} entryId : {} is having: {} replicas, less than writeQuorum num of replicas : {}", new Object[]{ledgerWithMissingEntries, entryWithMaxNumOfMissingReplicas, leastNumOfReplicasOfAnEntry, writeQuorumSize});
        }
    }

    private void reportLedgersWithUnavailableBookies(ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies) {
        StringBuilder errMessage = new StringBuilder();
        for (Map.Entry<Long, MissingEntriesInfoOfLedger> ledgerWithUnavailableBookiesInfo : ledgersWithUnavailableBookies.entrySet()) {
            errMessage.setLength(0);
            long ledgerWithUnavailableBookies = ledgerWithUnavailableBookiesInfo.getKey();
            List missingBookiesInfoList = ledgerWithUnavailableBookiesInfo.getValue().getMissingEntriesInfoList();
            errMessage.append("Ledger : " + ledgerWithUnavailableBookies + " has following unavailable bookies : ");
            for (int listInd = 0; listInd < missingBookiesInfoList.size(); ++listInd) {
                MissingEntriesInfo missingBookieInfo = (MissingEntriesInfo)missingBookiesInfoList.get(listInd);
                Map.Entry segmentEnsemble = missingBookieInfo.getSegmentEnsemble();
                errMessage.append("In segment starting at " + segmentEnsemble.getKey() + " with ensemble " + segmentEnsemble.getValue() + ", following bookie has not responded " + missingBookieInfo.getBookieMissingEntries());
                if (listInd >= missingBookiesInfoList.size() - 1) continue;
                errMessage.append(", ");
            }
            LOG.error(errMessage.toString());
        }
    }

    boolean checkUnderReplicationForReplicasCheck(long ledgerInRange, AsyncCallback.VoidCallback mcbForThisLedgerRange) {
        try {
            if (this.ledgerUnderreplicationManager.getLedgerUnreplicationInfo(ledgerInRange) == null) {
                return false;
            }
            LOG.debug("Ledger: {} is marked underrreplicated, ignore this ledger for replicasCheck", (Object)ledgerInRange);
            mcbForThisLedgerRange.processResult(0, null, null);
            return true;
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
            return true;
        }
        catch (ReplicationException.UnavailableException une) {
            LOG.error("Got exception while trying to check if ledger: {} is underreplicated", (Object)ledgerInRange, (Object)une);
            mcbForThisLedgerRange.processResult(BKException.getExceptionCode(une), null, null);
            return true;
        }
    }

    public void shutdown() {
        LOG.info("Shutting down auditor");
        this.executor.shutdown();
        this.ledgerCheckerExecutor.shutdown();
        try {
            while (!this.executor.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor not shutting down, interrupting");
                this.executor.shutdownNow();
            }
            while (!this.ledgerCheckerExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor for ledger checker not shutting down, interrupting");
                this.ledgerCheckerExecutor.shutdownNow();
            }
            if (this.ownAdmin) {
                this.admin.close();
            }
            if (this.ownBkc) {
                this.bkc.close();
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.warn("Interrupted while shutting down auditor bookie", (Throwable)ie);
        }
        catch (BKException bke) {
            LOG.warn("Exception while shutting down auditor bookie", (Throwable)bke);
        }
    }

    @Override
    public void close() {
        this.shutdown();
    }

    public boolean isRunning() {
        return !this.executor.isShutdown();
    }

    int getLostBookieRecoveryDelayBeforeChange() {
        return this.lostBookieRecoveryDelayBeforeChange;
    }

    Future<?> getAuditTask() {
        return this.auditTask;
    }

    private static class ReplicasCheckFinalCallback
    implements AsyncCallback.VoidCallback {
        final AtomicInteger resultCode;
        final CountDownLatch replicasCheckLatch;

        private ReplicasCheckFinalCallback(AtomicInteger resultCode, CountDownLatch replicasCheckLatch) {
            this.resultCode = resultCode;
            this.replicasCheckLatch = replicasCheckLatch;
        }

        @Override
        public void processResult(int rc, String s, Object obj) {
            this.resultCode.set(rc);
            this.replicasCheckLatch.countDown();
        }
    }

    private static class GetListOfEntriesOfLedgerCallbackForReplicasCheck
    implements BiConsumer<AvailabilityOfEntriesOfLedger, Throwable> {
        private final long ledgerInRange;
        private final int ensembleSize;
        private final int writeQuorumSize;
        private final int ackQuorumSize;
        private final BookieId bookieInEnsemble;
        private final List<BookieExpectedToContainSegmentInfo> bookieExpectedToContainSegmentInfoList;
        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries;
        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies;
        private final BookkeeperInternalCallbacks.MultiCallback mcbForThisLedger;

        private GetListOfEntriesOfLedgerCallbackForReplicasCheck(long ledgerInRange, int ensembleSize, int writeQuorumSize, int ackQuorumSize, BookieId bookieInEnsemble, List<BookieExpectedToContainSegmentInfo> bookieExpectedToContainSegmentInfoList, ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries, ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies, BookkeeperInternalCallbacks.MultiCallback mcbForThisLedger) {
            this.ledgerInRange = ledgerInRange;
            this.ensembleSize = ensembleSize;
            this.writeQuorumSize = writeQuorumSize;
            this.ackQuorumSize = ackQuorumSize;
            this.bookieInEnsemble = bookieInEnsemble;
            this.bookieExpectedToContainSegmentInfoList = bookieExpectedToContainSegmentInfoList;
            this.ledgersWithMissingEntries = ledgersWithMissingEntries;
            this.ledgersWithUnavailableBookies = ledgersWithUnavailableBookies;
            this.mcbForThisLedger = mcbForThisLedger;
        }

        @Override
        public void accept(AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger, Throwable listOfEntriesException) {
            if (listOfEntriesException != null) {
                if (BKException.getExceptionCode(listOfEntriesException) == -7) {
                    LOG.debug("Got NoSuchLedgerExistsException for ledger: {} from bookie: {}", (Object)this.ledgerInRange, (Object)this.bookieInEnsemble);
                    availabilityOfEntriesOfLedger = AvailabilityOfEntriesOfLedger.EMPTY_AVAILABILITYOFENTRIESOFLEDGER;
                } else {
                    LOG.warn("Unable to GetListOfEntriesOfLedger for ledger: {} from: {}", new Object[]{this.ledgerInRange, this.bookieInEnsemble, listOfEntriesException});
                    MissingEntriesInfoOfLedger unavailableBookiesInfoOfThisLedger = this.ledgersWithUnavailableBookies.get(this.ledgerInRange);
                    if (unavailableBookiesInfoOfThisLedger == null) {
                        this.ledgersWithUnavailableBookies.putIfAbsent(this.ledgerInRange, new MissingEntriesInfoOfLedger(this.ledgerInRange, this.ensembleSize, this.writeQuorumSize, this.ackQuorumSize, Collections.synchronizedList(new ArrayList())));
                        unavailableBookiesInfoOfThisLedger = this.ledgersWithUnavailableBookies.get(this.ledgerInRange);
                    }
                    List missingEntriesInfoList = unavailableBookiesInfoOfThisLedger.getMissingEntriesInfoList();
                    for (BookieExpectedToContainSegmentInfo bookieExpectedToContainSegmentInfo : this.bookieExpectedToContainSegmentInfoList) {
                        missingEntriesInfoList.add(new MissingEntriesInfo(this.ledgerInRange, bookieExpectedToContainSegmentInfo.getSegmentEnsemble(), this.bookieInEnsemble, null));
                        this.mcbForThisLedger.processResult(0, null, null);
                    }
                    return;
                }
            }
            for (BookieExpectedToContainSegmentInfo bookieExpectedToContainSegmentInfo : this.bookieExpectedToContainSegmentInfoList) {
                long startEntryIdOfSegment = bookieExpectedToContainSegmentInfo.getStartEntryIdOfSegment();
                long lastEntryIdOfSegment = bookieExpectedToContainSegmentInfo.getLastEntryIdOfSegment();
                BitSet entriesStripedToThisBookie = bookieExpectedToContainSegmentInfo.getEntriesOfSegmentStripedToThisBookie();
                Map.Entry<Long, ? extends List<BookieId>> segmentEnsemble = bookieExpectedToContainSegmentInfo.getSegmentEnsemble();
                List<Long> unavailableEntriesList = availabilityOfEntriesOfLedger.getUnavailableEntries(startEntryIdOfSegment, lastEntryIdOfSegment, entriesStripedToThisBookie);
                if (unavailableEntriesList != null && !unavailableEntriesList.isEmpty()) {
                    MissingEntriesInfoOfLedger missingEntriesInfoOfThisLedger = this.ledgersWithMissingEntries.get(this.ledgerInRange);
                    if (missingEntriesInfoOfThisLedger == null) {
                        this.ledgersWithMissingEntries.putIfAbsent(this.ledgerInRange, new MissingEntriesInfoOfLedger(this.ledgerInRange, this.ensembleSize, this.writeQuorumSize, this.ackQuorumSize, Collections.synchronizedList(new ArrayList())));
                        missingEntriesInfoOfThisLedger = this.ledgersWithMissingEntries.get(this.ledgerInRange);
                    }
                    missingEntriesInfoOfThisLedger.getMissingEntriesInfoList().add(new MissingEntriesInfo(this.ledgerInRange, segmentEnsemble, this.bookieInEnsemble, unavailableEntriesList));
                }
                this.mcbForThisLedger.processResult(0, null, null);
            }
        }
    }

    private static class BookieExpectedToContainSegmentInfo {
        private final long startEntryIdOfSegment;
        private final long lastEntryIdOfSegment;
        private final Map.Entry<Long, ? extends List<BookieId>> segmentEnsemble;
        private final BitSet entriesOfSegmentStripedToThisBookie;

        private BookieExpectedToContainSegmentInfo(long startEntryIdOfSegment, long lastEntryIdOfSegment, Map.Entry<Long, ? extends List<BookieId>> segmentEnsemble, BitSet entriesOfSegmentStripedToThisBookie) {
            this.startEntryIdOfSegment = startEntryIdOfSegment;
            this.lastEntryIdOfSegment = lastEntryIdOfSegment;
            this.segmentEnsemble = segmentEnsemble;
            this.entriesOfSegmentStripedToThisBookie = entriesOfSegmentStripedToThisBookie;
        }

        public long getStartEntryIdOfSegment() {
            return this.startEntryIdOfSegment;
        }

        public long getLastEntryIdOfSegment() {
            return this.lastEntryIdOfSegment;
        }

        public Map.Entry<Long, ? extends List<BookieId>> getSegmentEnsemble() {
            return this.segmentEnsemble;
        }

        public BitSet getEntriesOfSegmentStripedToThisBookie() {
            return this.entriesOfSegmentStripedToThisBookie;
        }
    }

    private class ReadLedgerMetadataCallbackForReplicasCheck
    implements BiConsumer<Versioned<LedgerMetadata>, Throwable> {
        private final long ledgerInRange;
        private final BookkeeperInternalCallbacks.MultiCallback mcbForThisLedgerRange;
        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries;
        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies;

        ReadLedgerMetadataCallbackForReplicasCheck(long ledgerInRange, BookkeeperInternalCallbacks.MultiCallback mcbForThisLedgerRange, ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries, ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies) {
            this.ledgerInRange = ledgerInRange;
            this.mcbForThisLedgerRange = mcbForThisLedgerRange;
            this.ledgersWithMissingEntries = ledgersWithMissingEntries;
            this.ledgersWithUnavailableBookies = ledgersWithUnavailableBookies;
        }

        @Override
        public void accept(Versioned<LedgerMetadata> metadataVer, Throwable exception) {
            if (exception != null) {
                if (BKException.getExceptionCode(exception) == -25) {
                    LOG.debug("Ignoring replicas check of already deleted ledger {}", (Object)this.ledgerInRange);
                    this.mcbForThisLedgerRange.processResult(0, null, null);
                    return;
                }
                LOG.warn("Unable to read the ledger: {} information", (Object)this.ledgerInRange, (Object)exception);
                this.mcbForThisLedgerRange.processResult(BKException.getExceptionCode(exception), null, null);
                return;
            }
            LedgerMetadata metadata = metadataVer.getValue();
            if (!metadata.isClosed()) {
                LOG.debug("Ledger: {} is not yet closed, so skipping the replicas check analysis for now", (Object)this.ledgerInRange);
                this.mcbForThisLedgerRange.processResult(0, null, null);
                return;
            }
            long lastEntryId = metadata.getLastEntryId();
            if (lastEntryId == -1L) {
                LOG.debug("Ledger: {} is closed but it doesn't has any entries, so skipping the replicas check", (Object)this.ledgerInRange);
                this.mcbForThisLedgerRange.processResult(0, null, null);
                return;
            }
            int writeQuorumSize = metadata.getWriteQuorumSize();
            int ackQuorumSize = metadata.getAckQuorumSize();
            int ensembleSize = metadata.getEnsembleSize();
            RoundRobinDistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(writeQuorumSize, ackQuorumSize, ensembleSize);
            LinkedList segments = new LinkedList(metadata.getAllEnsembles().entrySet());
            BookkeeperInternalCallbacks.MultiCallback mcbForThisLedger = new BookkeeperInternalCallbacks.MultiCallback(ensembleSize * segments.size(), this.mcbForThisLedgerRange, null, 0, -1);
            HashMap<BookieId, ArrayList<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoMap = new HashMap<BookieId, ArrayList<BookieExpectedToContainSegmentInfo>>();
            for (int segmentNum = 0; segmentNum < segments.size(); ++segmentNum) {
                long lastEntryIdOfSegment;
                Map.Entry segmentEnsemble = (Map.Entry)segments.get(segmentNum);
                List ensembleOfSegment = (List)segmentEnsemble.getValue();
                long startEntryIdOfSegment = (Long)segmentEnsemble.getKey();
                boolean lastSegment = segmentNum == segments.size() - 1;
                long l = lastEntryIdOfSegment = lastSegment ? lastEntryId : (Long)((Map.Entry)segments.get(segmentNum + 1)).getKey() - 1L;
                boolean emptySegment = lastSegment ? startEntryIdOfSegment > lastEntryId : startEntryIdOfSegment == (Long)((Map.Entry)segments.get(segmentNum + 1)).getKey();
                for (int bookieIndex = 0; bookieIndex < ensembleOfSegment.size(); ++bookieIndex) {
                    BitSet entriesStripedToThisBookie;
                    BookieId bookieInEnsemble = (BookieId)ensembleOfSegment.get(bookieIndex);
                    BitSet bitSet = entriesStripedToThisBookie = emptySegment ? EMPTY_BITSET : distributionSchedule.getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment, lastEntryIdOfSegment);
                    if (entriesStripedToThisBookie.cardinality() == 0) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("For ledger: {}, in Segment: {}, no entry is expected to contain in this bookie: {}. So skipping getListOfEntriesOfLedger call", new Object[]{this.ledgerInRange, segmentEnsemble, bookieInEnsemble});
                        }
                        mcbForThisLedger.processResult(0, null, null);
                        continue;
                    }
                    ArrayList<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = (ArrayList<BookieExpectedToContainSegmentInfo>)bookiesSegmentInfoMap.get(bookieInEnsemble);
                    if (bookieSegmentInfoList == null) {
                        bookieSegmentInfoList = new ArrayList<BookieExpectedToContainSegmentInfo>();
                        bookiesSegmentInfoMap.put(bookieInEnsemble, bookieSegmentInfoList);
                    }
                    bookieSegmentInfoList.add(new BookieExpectedToContainSegmentInfo(startEntryIdOfSegment, lastEntryIdOfSegment, segmentEnsemble, entriesStripedToThisBookie));
                }
            }
            for (Map.Entry bookiesSegmentInfoTuple : bookiesSegmentInfoMap.entrySet()) {
                BookieId bookieInEnsemble = (BookieId)bookiesSegmentInfoTuple.getKey();
                List bookieSegmentInfoList = (List)bookiesSegmentInfoTuple.getValue();
                Auditor.this.admin.asyncGetListOfEntriesOfLedger(bookieInEnsemble, this.ledgerInRange).whenComplete((BiConsumer)new GetListOfEntriesOfLedgerCallbackForReplicasCheck(this.ledgerInRange, ensembleSize, writeQuorumSize, ackQuorumSize, bookieInEnsemble, bookieSegmentInfoList, this.ledgersWithMissingEntries, this.ledgersWithUnavailableBookies, mcbForThisLedger));
            }
        }
    }

    private static class MissingEntriesInfoOfLedger {
        private final long ledgerId;
        private final int ensembleSize;
        private final int writeQuorumSize;
        private final int ackQuorumSize;
        private final List<MissingEntriesInfo> missingEntriesInfoList;

        private MissingEntriesInfoOfLedger(long ledgerId, int ensembleSize, int writeQuorumSize, int ackQuorumSize, List<MissingEntriesInfo> missingEntriesInfoList) {
            this.ledgerId = ledgerId;
            this.ensembleSize = ensembleSize;
            this.writeQuorumSize = writeQuorumSize;
            this.ackQuorumSize = ackQuorumSize;
            this.missingEntriesInfoList = missingEntriesInfoList;
        }

        private long getLedgerId() {
            return this.ledgerId;
        }

        private int getEnsembleSize() {
            return this.ensembleSize;
        }

        private int getWriteQuorumSize() {
            return this.writeQuorumSize;
        }

        private int getAckQuorumSize() {
            return this.ackQuorumSize;
        }

        private List<MissingEntriesInfo> getMissingEntriesInfoList() {
            return this.missingEntriesInfoList;
        }
    }

    private static class MissingEntriesInfo {
        private final long ledgerId;
        private final Map.Entry<Long, ? extends List<BookieId>> segmentEnsemble;
        private final BookieId bookieMissingEntries;
        private final List<Long> unavailableEntriesList;

        private MissingEntriesInfo(long ledgerId, Map.Entry<Long, ? extends List<BookieId>> segmentEnsemble, BookieId bookieMissingEntries, List<Long> unavailableEntriesList) {
            this.ledgerId = ledgerId;
            this.segmentEnsemble = segmentEnsemble;
            this.bookieMissingEntries = bookieMissingEntries;
            this.unavailableEntriesList = unavailableEntriesList;
        }

        private long getLedgerId() {
            return this.ledgerId;
        }

        private Map.Entry<Long, ? extends List<BookieId>> getSegmentEnsemble() {
            return this.segmentEnsemble;
        }

        private BookieId getBookieMissingEntries() {
            return this.bookieMissingEntries;
        }

        private List<Long> getUnavailableEntriesList() {
            return this.unavailableEntriesList;
        }
    }

    private class ProcessLostFragmentsCb
    implements BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>> {
        final LedgerHandle lh;
        final AsyncCallback.VoidCallback callback;

        ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
            this.lh = lh;
            this.callback = callback;
        }

        @Override
        public void operationComplete(int rc, Set<LedgerFragment> fragments) {
            if (rc == 0) {
                HashSet<BookieId> bookies = Sets.newHashSet();
                for (LedgerFragment f : fragments) {
                    bookies.addAll(f.getAddresses());
                }
                if (bookies.isEmpty()) {
                    this.callback.processResult(0, null, null);
                    return;
                }
                Auditor.this.publishSuspectedLedgersAsync(bookies.stream().map(BookieId::toString).collect(Collectors.toList()), Sets.newHashSet(this.lh.getId())).whenComplete((result, cause) -> {
                    if (null != cause) {
                        LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}", new Object[]{this.lh.getId(), bookies, cause});
                        this.callback.processResult(-200, null, null);
                    } else {
                        this.callback.processResult(0, null, null);
                    }
                });
            } else {
                this.callback.processResult(rc, null, null);
            }
            this.lh.closeAsync().whenComplete((result, cause) -> {
                if (null != cause) {
                    LOG.warn("Error closing ledger {} : {}", (Object)this.lh.getId(), (Object)cause.getMessage());
                }
            });
        }
    }

    private class LostBookieRecoveryDelayChangedCb
    implements BookkeeperInternalCallbacks.GenericCallback<Void> {
        private LostBookieRecoveryDelayChangedCb() {
        }

        @Override
        public void operationComplete(int rc, Void result) {
            try {
                Auditor.this.ledgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(this);
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                Auditor.this.submitShutdownTask();
            }
            catch (ReplicationException.UnavailableException ae) {
                LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", (Throwable)ae);
            }
            Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
        }
    }
}

