/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.replication;

import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieAccessor;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.AuditorCheckAllLedgersTask;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.AuditorTask;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditorPeriodicCheckTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(AuditorPeriodicCheckTest.class);
    private MetadataBookieDriver driver;
    private HashMap<String, AuditorElector> auditorElectors = new HashMap();
    private static final int CHECK_INTERVAL = 1;

    public AuditorPeriodicCheckTest() {
        super(3);
        this.baseConf.setPageLimit(1);
    }

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        for (int i = 0; i < this.numBookies; ++i) {
            ServerConfiguration conf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(i));
            conf.setAuditorPeriodicCheckInterval(1L);
            String addr = this.addressByIndex(i).toString();
            AuditorElector auditorElector = new AuditorElector(addr, conf);
            this.auditorElectors.put(addr, auditorElector);
            auditorElector.start();
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Starting Auditor Elector");
        }
        this.driver = MetadataDrivers.getBookieDriver((URI)URI.create(this.confByIndex(0).getMetadataServiceUri()));
        this.driver.initialize(this.confByIndex(0), (StatsLogger)NullStatsLogger.INSTANCE);
    }

    @Override
    @After
    public void tearDown() throws Exception {
        if (null != this.driver) {
            this.driver.close();
        }
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        super.tearDown();
    }

    @Test
    public void testEntryLogCorruption() throws Exception {
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        underReplicationManager.disableLedgerReplication();
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        long ledgerId = lh.getId();
        for (int i = 0; i < 100; ++i) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        BookieAccessor.forceFlush((BookieImpl)this.serverByIndex(0).getBookie());
        File ledgerDir = this.confByIndex(0).getLedgerDirs()[0];
        ledgerDir = BookieImpl.getCurrentDirectory((File)ledgerDir);
        File[] entryLogs = ledgerDir.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.endsWith(".log");
            }
        });
        ByteBuffer junk = ByteBuffer.allocate(0x100000);
        for (File f : entryLogs) {
            FileOutputStream out = new FileOutputStream(f);
            out.getChannel().write(junk);
            out.close();
        }
        this.restartBookies();
        underReplicationManager.enableLedgerReplication();
        long underReplicatedLedger = -1L;
        for (int i = 0; i < 10 && (underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) == -1L; ++i) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Ledger should be under replicated", (long)ledgerId, (long)underReplicatedLedger);
        underReplicationManager.close();
    }

    @Test
    public void testIndexCorruption() throws Exception {
        int i;
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        long ledgerToCorrupt = lh.getId();
        for (i = 0; i < 100; ++i) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        for (i = 0; i < 100; ++i) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        BookieAccessor.forceFlush((BookieImpl)this.serverByIndex(0).getBookie());
        File ledgerDir = this.confByIndex(0).getLedgerDirs()[0];
        ledgerDir = BookieImpl.getCurrentDirectory((File)ledgerDir);
        File index = new File(ledgerDir, IndexPersistenceMgr.getLedgerName((long)ledgerToCorrupt));
        LOG.info("file to corrupt{}", (Object)index);
        ByteBuffer junk = ByteBuffer.allocate(0x100000);
        FileOutputStream out = new FileOutputStream(index);
        out.getChannel().write(junk);
        out.close();
        long underReplicatedLedger = -1L;
        for (int i2 = 0; i2 < 15 && (underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) == -1L; ++i2) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Ledger should be under replicated", (long)ledgerToCorrupt, (long)underReplicatedLedger);
        underReplicationManager.close();
    }

    @Test
    public void testPeriodicCheckWhenDisabled() throws Exception {
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        int numLedgers = 10;
        int numMsgs = 2;
        final CountDownLatch completeLatch = new CountDownLatch(20);
        final AtomicInteger rc = new AtomicInteger(0);
        ArrayList<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            lhs.add(lh);
            for (int j = 0; j < 2; ++j) {
                lh.asyncAddEntry("testdata".getBytes(), new AsyncCallback.AddCallback(){

                    public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
                        if (rc.compareAndSet(0, rc2)) {
                            LOG.info("Failed to add entry : {}", (Object)BKException.getMessage((int)rc2));
                        }
                        completeLatch.countDown();
                    }
                }, null);
            }
        }
        completeLatch.await();
        if (rc.get() != 0) {
            throw BKException.create((int)rc.get());
        }
        for (LedgerHandle lh : lhs) {
            lh.close();
        }
        underReplicationManager.disableLedgerReplication();
        final AtomicInteger numReads = new AtomicInteger(0);
        ServerConfiguration conf = this.killBookie(0);
        TestBookieImpl deadBookie = new TestBookieImpl(conf){

            public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, Bookie.NoLedgerException {
                numReads.incrementAndGet();
                throw new IOException("Fake I/O exception");
            }
        };
        this.startAndAddBookie(conf, (Bookie)deadBookie);
        Thread.sleep(2000L);
        Assert.assertEquals((String)"Nothing should have tried to read", (long)0L, (long)numReads.get());
        underReplicationManager.enableLedgerReplication();
        Thread.sleep(2000L);
        underReplicationManager.disableLedgerReplication();
        Thread.sleep(2000L);
        int numUnderreplicated = 0;
        long underReplicatedLedger = -1L;
        while ((underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) != -1L) {
            ++numUnderreplicated;
            underReplicationManager.markLedgerReplicated(underReplicatedLedger);
            if (underReplicatedLedger != -1L) continue;
        }
        Thread.sleep(2000L);
        underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
        Assert.assertEquals((String)"There should be no underreplicated ledgers", (long)-1L, (long)underReplicatedLedger);
        LOG.info("{} of {} ledgers underreplicated", (Object)numUnderreplicated, (Object)numUnderreplicated);
        Assert.assertTrue((String)"All should be underreplicated", (numUnderreplicated <= 10 && numUnderreplicated > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testPeriodicCheckWhenLedgerDeleted() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        try (final Auditor auditor = new Auditor(BookieImpl.getBookieId((ServerConfiguration)this.confByIndex(0)).toString(), this.confByIndex(0), (StatsLogger)NullStatsLogger.INSTANCE);){
            final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
            final CountDownLatch latch = new CountDownLatch(1);
            Thread t = new Thread(){

                @Override
                public void run() {
                    try {
                        latch.countDown();
                        for (int i = 0; i < 10; ++i) {
                            ((AuditorCheckAllLedgersTask)auditor.auditorCheckAllLedgersTask).checkAllLedgers();
                        }
                    }
                    catch (Exception e) {
                        LOG.error("Caught exception while checking all ledgers", (Throwable)e);
                        exceptionCaught.set(true);
                    }
                }
            };
            t.start();
            latch.await();
            for (Long id : ids) {
                this.bkc.deleteLedger(id);
            }
            t.join();
            Assert.assertFalse((String)"Shouldn't have thrown exception", (boolean)exceptionCaught.get());
        }
    }

    @Test
    public void testGetLedgerFromZookeeperThrottled() throws Exception {
        int numberLedgers = 30;
        try {
            for (AuditorElector e : this.auditorElectors.values()) {
                e.shutdown();
            }
            for (int i = 0; i < 30; ++i) {
                LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
                for (int j = 0; j < 5; ++j) {
                    lh.addEntry("testdata".getBytes());
                }
                lh.close();
            }
        }
        catch (InterruptedException | BKException e) {
            LOG.error("Failed to shutdown auditor elector or write data to ledgers ", e);
            Assert.fail();
        }
        ServerConfiguration configuration = this.confByIndex(0);
        configuration.setAuditorMaxNumberOfConcurrentOpenLedgerOperations(10);
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        Counter numLedgersChecked = statsLogger.getCounter("NUM_LEDGERS_CHECKED");
        Auditor auditor = new Auditor(BookieImpl.getBookieId((ServerConfiguration)configuration).toString(), configuration, (StatsLogger)statsLogger);
        try {
            ((AuditorCheckAllLedgersTask)auditor.auditorCheckAllLedgersTask).checkAllLedgers();
            Assert.assertEquals((String)"NUM_LEDGERS_CHECKED", (long)30L, (long)numLedgersChecked.get());
        }
        catch (Exception e) {
            LOG.error("Caught exception while checking all ledgers ", (Throwable)e);
            Assert.fail();
        }
    }

    @Test
    public void testInitialDelayOfCheckAllLedgers() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
        ServerConfiguration servConf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(0));
        this.validateInitialDelayOfCheckAllLedgers(urm, -1L, 1000L, servConf, this.bkc);
        this.validateInitialDelayOfCheckAllLedgers(urm, 999L, 1000L, servConf, this.bkc);
        this.validateInitialDelayOfCheckAllLedgers(urm, 1001L, 1000L, servConf, this.bkc);
    }

    void validateInitialDelayOfCheckAllLedgers(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs, long auditorPeriodicCheckInterval, ServerConfiguration servConf, BookKeeper bkc) throws ReplicationException.UnavailableException, UnknownHostException, InterruptedException {
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        TestStatsProvider.TestOpStatsLogger checkAllLedgersStatsLogger = (TestStatsProvider.TestOpStatsLogger)statsLogger.getOpStatsLogger("CHECK_ALL_LEDGERS_TIME");
        servConf.setAuditorPeriodicCheckInterval(auditorPeriodicCheckInterval);
        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0L);
        servConf.setAuditorPeriodicBookieCheckInterval(0L);
        TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId((ServerConfiguration)servConf).toString(), servConf, bkc, false, (StatsLogger)statsLogger, null);
        CountDownLatch latch = auditor.getLatch();
        Assert.assertEquals((String)"CHECK_ALL_LEDGERS_TIME SuccessCount", (long)0L, (long)checkAllLedgersStatsLogger.getSuccessCount());
        long curTimeBeforeStart = System.currentTimeMillis();
        long checkAllLedgersCTime = -1L;
        long initialDelayInMsecs = -1L;
        long nextExpectedCheckAllLedgersExecutionTime = -1L;
        long bufferTimeInMsecs = 12000L;
        if (timeSinceLastExecutedInSecs == -1L) {
            checkAllLedgersCTime = -1L;
            initialDelayInMsecs = 0L;
        } else {
            checkAllLedgersCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L;
            initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicCheckInterval ? 0L : (auditorPeriodicCheckInterval - timeSinceLastExecutedInSecs) * 1000L;
        }
        nextExpectedCheckAllLedgersExecutionTime = curTimeBeforeStart + initialDelayInMsecs;
        urm.setCheckAllLedgersCTime(checkAllLedgersCTime);
        auditor.start();
        Assert.assertTrue((String)("checkAllLedgers should have executed with initialDelay " + initialDelayInMsecs), (boolean)latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS));
        for (int i = 0; i < 10; ++i) {
            Thread.sleep(100L);
            if (checkAllLedgersStatsLogger.getSuccessCount() >= 1L) break;
        }
        Assert.assertEquals((String)"CHECK_ALL_LEDGERS_TIME SuccessCount", (long)1L, (long)checkAllLedgersStatsLogger.getSuccessCount());
        long currentCheckAllLedgersCTime = urm.getCheckAllLedgersCTime();
        Assert.assertTrue((String)("currentCheckAllLedgersCTime: " + currentCheckAllLedgersCTime + " should be greater than nextExpectedCheckAllLedgersExecutionTime: " + nextExpectedCheckAllLedgersExecutionTime), (currentCheckAllLedgersCTime > nextExpectedCheckAllLedgersExecutionTime ? 1 : 0) != 0);
        Assert.assertTrue((String)("currentCheckAllLedgersCTime: " + currentCheckAllLedgersCTime + " should be lesser than nextExpectedCheckAllLedgersExecutionTime+bufferTimeInMsecs: " + (nextExpectedCheckAllLedgersExecutionTime + bufferTimeInMsecs)), (currentCheckAllLedgersCTime < nextExpectedCheckAllLedgersExecutionTime + bufferTimeInMsecs ? 1 : 0) != 0);
        auditor.close();
    }

    @Test
    public void testInitialDelayOfPlacementPolicyCheck() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
        ServerConfiguration servConf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(0));
        this.validateInitialDelayOfPlacementPolicyCheck(urm, -1L, 1000L, servConf, this.bkc);
        this.validateInitialDelayOfPlacementPolicyCheck(urm, 999L, 1000L, servConf, this.bkc);
        this.validateInitialDelayOfPlacementPolicyCheck(urm, 1001L, 1000L, servConf, this.bkc);
    }

    void validateInitialDelayOfPlacementPolicyCheck(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs, long auditorPeriodicPlacementPolicyCheckInterval, ServerConfiguration servConf, BookKeeper bkc) throws ReplicationException.UnavailableException, UnknownHostException, InterruptedException {
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        TestStatsProvider.TestOpStatsLogger placementPolicyCheckStatsLogger = (TestStatsProvider.TestOpStatsLogger)statsLogger.getOpStatsLogger("PLACEMENT_POLICY_CHECK_TIME");
        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(auditorPeriodicPlacementPolicyCheckInterval);
        servConf.setAuditorPeriodicCheckInterval(0L);
        servConf.setAuditorPeriodicBookieCheckInterval(0L);
        TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId((ServerConfiguration)servConf).toString(), servConf, bkc, false, (StatsLogger)statsLogger, null);
        CountDownLatch latch = auditor.getLatch();
        Assert.assertEquals((String)"PLACEMENT_POLICY_CHECK_TIME SuccessCount", (long)0L, (long)placementPolicyCheckStatsLogger.getSuccessCount());
        long curTimeBeforeStart = System.currentTimeMillis();
        long placementPolicyCheckCTime = -1L;
        long initialDelayInMsecs = -1L;
        long nextExpectedPlacementPolicyCheckExecutionTime = -1L;
        long bufferTimeInMsecs = 20000L;
        if (timeSinceLastExecutedInSecs == -1L) {
            placementPolicyCheckCTime = -1L;
            initialDelayInMsecs = 0L;
        } else {
            placementPolicyCheckCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L;
            initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicPlacementPolicyCheckInterval ? 0L : (auditorPeriodicPlacementPolicyCheckInterval - timeSinceLastExecutedInSecs) * 1000L;
        }
        nextExpectedPlacementPolicyCheckExecutionTime = curTimeBeforeStart + initialDelayInMsecs;
        urm.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
        auditor.start();
        Assert.assertTrue((String)("placementPolicyCheck should have executed with initialDelay " + initialDelayInMsecs), (boolean)latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS));
        for (int i = 0; i < 20; ++i) {
            Thread.sleep(100L);
            if (placementPolicyCheckStatsLogger.getSuccessCount() >= 1L) break;
        }
        Assert.assertEquals((String)"PLACEMENT_POLICY_CHECK_TIME SuccessCount", (long)1L, (long)placementPolicyCheckStatsLogger.getSuccessCount());
        long currentPlacementPolicyCheckCTime = urm.getPlacementPolicyCheckCTime();
        Assert.assertTrue((String)("currentPlacementPolicyCheckCTime: " + currentPlacementPolicyCheckCTime + " should be greater than nextExpectedPlacementPolicyCheckExecutionTime: " + nextExpectedPlacementPolicyCheckExecutionTime), (currentPlacementPolicyCheckCTime > nextExpectedPlacementPolicyCheckExecutionTime ? 1 : 0) != 0);
        Assert.assertTrue((String)("currentPlacementPolicyCheckCTime: " + currentPlacementPolicyCheckCTime + " should be lesser than nextExpectedPlacementPolicyCheckExecutionTime+bufferTimeInMsecs: " + (nextExpectedPlacementPolicyCheckExecutionTime + bufferTimeInMsecs)), (currentPlacementPolicyCheckCTime < nextExpectedPlacementPolicyCheckExecutionTime + bufferTimeInMsecs ? 1 : 0) != 0);
        auditor.close();
    }

    @Test
    public void testInitialDelayOfReplicasCheck() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        LedgerHandle lh = this.bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        for (int j = 0; j < 5; ++j) {
            lh.addEntry("testdata".getBytes());
        }
        lh.close();
        long ledgerId = 100000L;
        lh = this.bkc.createLedgerAdv(ledgerId, 3, 2, 2, BookKeeper.DigestType.CRC32, "passwd".getBytes(), null);
        lh.close();
        ledgerId = 100001234L;
        lh = this.bkc.createLedgerAdv(ledgerId, 3, 3, 2, BookKeeper.DigestType.CRC32, "passwd".getBytes(), null);
        for (int j = 0; j < 4; ++j) {
            lh.addEntry((long)j, "testdata".getBytes());
        }
        lh.close();
        ledgerId = 991234L;
        lh = this.bkc.createLedgerAdv(ledgerId, 3, 2, 2, BookKeeper.DigestType.CRC32, "passwd".getBytes(), null);
        lh.addEntry(0L, "testdata".getBytes());
        lh.close();
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
        ServerConfiguration servConf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(0));
        this.validateInitialDelayOfReplicasCheck(urm, -1L, 1000L, servConf, this.bkc);
        this.validateInitialDelayOfReplicasCheck(urm, 999L, 1000L, servConf, this.bkc);
        this.validateInitialDelayOfReplicasCheck(urm, 1001L, 1000L, servConf, this.bkc);
    }

    void validateInitialDelayOfReplicasCheck(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs, long auditorPeriodicReplicasCheckInterval, ServerConfiguration servConf, BookKeeper bkc) throws ReplicationException.UnavailableException, UnknownHostException, InterruptedException {
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        TestStatsProvider.TestOpStatsLogger replicasCheckStatsLogger = (TestStatsProvider.TestOpStatsLogger)statsLogger.getOpStatsLogger("REPLICAS_CHECK_TIME");
        servConf.setAuditorPeriodicReplicasCheckInterval(auditorPeriodicReplicasCheckInterval);
        servConf.setAuditorPeriodicCheckInterval(0L);
        servConf.setAuditorPeriodicBookieCheckInterval(0L);
        TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId((ServerConfiguration)servConf).toString(), servConf, bkc, false, (StatsLogger)statsLogger, null);
        CountDownLatch latch = auditor.getLatch();
        Assert.assertEquals((String)"REPLICAS_CHECK_TIME SuccessCount", (long)0L, (long)replicasCheckStatsLogger.getSuccessCount());
        long curTimeBeforeStart = System.currentTimeMillis();
        long replicasCheckCTime = -1L;
        long initialDelayInMsecs = -1L;
        long nextExpectedReplicasCheckExecutionTime = -1L;
        long bufferTimeInMsecs = 20000L;
        if (timeSinceLastExecutedInSecs == -1L) {
            replicasCheckCTime = -1L;
            initialDelayInMsecs = 0L;
        } else {
            replicasCheckCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L;
            initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicReplicasCheckInterval ? 0L : (auditorPeriodicReplicasCheckInterval - timeSinceLastExecutedInSecs) * 1000L;
        }
        nextExpectedReplicasCheckExecutionTime = curTimeBeforeStart + initialDelayInMsecs;
        urm.setReplicasCheckCTime(replicasCheckCTime);
        auditor.start();
        Assert.assertTrue((String)("replicasCheck should have executed with initialDelay " + initialDelayInMsecs), (boolean)latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS));
        for (int i = 0; i < 20; ++i) {
            Thread.sleep(100L);
            if (replicasCheckStatsLogger.getSuccessCount() >= 1L) break;
        }
        Assert.assertEquals((String)"REPLICAS_CHECK_TIME SuccessCount", (long)1L, (long)replicasCheckStatsLogger.getSuccessCount());
        long currentReplicasCheckCTime = urm.getReplicasCheckCTime();
        Assert.assertTrue((String)("currentReplicasCheckCTime: " + currentReplicasCheckCTime + " should be greater than nextExpectedReplicasCheckExecutionTime: " + nextExpectedReplicasCheckExecutionTime), (currentReplicasCheckCTime > nextExpectedReplicasCheckExecutionTime ? 1 : 0) != 0);
        Assert.assertTrue((String)("currentReplicasCheckCTime: " + currentReplicasCheckCTime + " should be lesser than nextExpectedReplicasCheckExecutionTime+bufferTimeInMsecs: " + (nextExpectedReplicasCheckExecutionTime + bufferTimeInMsecs)), (currentReplicasCheckCTime < nextExpectedReplicasCheckExecutionTime + bufferTimeInMsecs ? 1 : 0) != 0);
        auditor.close();
    }

    @Test
    public void testDelayBookieAuditOfCheckAllLedgers() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
        ServerConfiguration servConf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(0));
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        Counter numBookieAuditsDelayed = statsLogger.getCounter("NUM_BOOKIE_AUDITS_DELAYED");
        TestStatsProvider.TestOpStatsLogger underReplicatedLedgerTotalSizeStatsLogger = (TestStatsProvider.TestOpStatsLogger)statsLogger.getOpStatsLogger("UNDER_REPLICATED_LEDGERS_TOTAL_SIZE");
        servConf.setAuditorPeriodicCheckInterval(1L);
        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0L);
        servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
        urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
        AtomicBoolean canRun = new AtomicBoolean(false);
        TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId((ServerConfiguration)servConf).toString(), servConf, this.bkc, false, (StatsLogger)statsLogger, canRun);
        CountDownLatch latch = auditor.getLatch();
        auditor.start();
        this.killBookie(this.addressByIndex(0));
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)1L, (long)numBookieAuditsDelayed.get()));
        Future auditTask = auditor.auditTask;
        Assert.assertTrue((auditTask != null && !auditTask.isDone() ? 1 : 0) != 0);
        canRun.set(true);
        Assert.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue((auditor.auditTask.equals(auditTask) && auditor.auditTask != null && !auditor.auditTask.isDone() ? 1 : 0) != 0);
        Assert.assertEquals((String)"UNDER_REPLICATED_LEDGERS_TOTAL_SIZE", (long)0L, (long)underReplicatedLedgerTotalSizeStatsLogger.getSuccessCount());
        auditor.close();
    }

    @Test
    public void testDelayBookieAuditOfPlacementPolicy() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
        ServerConfiguration servConf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(0));
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        Counter numBookieAuditsDelayed = statsLogger.getCounter("NUM_BOOKIE_AUDITS_DELAYED");
        TestStatsProvider.TestOpStatsLogger placementPolicyCheckTime = (TestStatsProvider.TestOpStatsLogger)statsLogger.getOpStatsLogger("PLACEMENT_POLICY_CHECK_TIME");
        servConf.setAuditorPeriodicCheckInterval(0L);
        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1L);
        servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
        urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
        AtomicBoolean canRun = new AtomicBoolean(false);
        TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId((ServerConfiguration)servConf).toString(), servConf, this.bkc, false, (StatsLogger)statsLogger, canRun);
        CountDownLatch latch = auditor.getLatch();
        auditor.start();
        this.killBookie(this.addressByIndex(0));
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)1L, (long)numBookieAuditsDelayed.get()));
        Future auditTask = auditor.auditTask;
        Assert.assertTrue((auditTask != null && !auditTask.isDone() ? 1 : 0) != 0);
        Assert.assertEquals((String)"PLACEMENT_POLICY_CHECK_TIME", (long)0L, (long)placementPolicyCheckTime.getSuccessCount());
        canRun.set(true);
        Assert.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue((auditor.auditTask.equals(auditTask) && auditor.auditTask != null && !auditor.auditTask.isDone() ? 1 : 0) != 0);
        Assert.assertEquals((String)"PLACEMENT_POLICY_CHECK_TIME", (long)0L, (long)placementPolicyCheckTime.getSuccessCount());
        auditor.close();
    }

    @Test
    public void testDelayBookieAuditOfReplicasCheck() throws Exception {
        for (AuditorElector e : this.auditorElectors.values()) {
            e.shutdown();
        }
        int numLedgers = 10;
        LinkedList<Long> ids = new LinkedList<Long>();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
            ids.add(lh.getId());
            for (int j = 0; j < 2; ++j) {
                lh.addEntry("testdata".getBytes());
            }
            lh.close();
        }
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
        ServerConfiguration servConf = new ServerConfiguration((AbstractConfiguration)this.confByIndex(0));
        TestStatsProvider statsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger("auditor");
        Counter numBookieAuditsDelayed = statsLogger.getCounter("NUM_BOOKIE_AUDITS_DELAYED");
        TestStatsProvider.TestOpStatsLogger replicasCheckTime = (TestStatsProvider.TestOpStatsLogger)statsLogger.getOpStatsLogger("REPLICAS_CHECK_TIME");
        servConf.setAuditorPeriodicCheckInterval(0L);
        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0L);
        servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
        servConf.setAuditorPeriodicReplicasCheckInterval(1L);
        urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
        AtomicBoolean canRun = new AtomicBoolean(false);
        TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId((ServerConfiguration)servConf).toString(), servConf, this.bkc, false, (StatsLogger)statsLogger, canRun);
        CountDownLatch latch = auditor.getLatch();
        auditor.start();
        this.killBookie(this.addressByIndex(0));
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)1L, (long)numBookieAuditsDelayed.get()));
        Future auditTask = auditor.auditTask;
        Assert.assertTrue((auditTask != null && !auditTask.isDone() ? 1 : 0) != 0);
        Assert.assertEquals((String)"REPLICAS_CHECK_TIME", (long)0L, (long)replicasCheckTime.getSuccessCount());
        canRun.set(true);
        Assert.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue((auditor.auditTask.equals(auditTask) && auditor.auditTask != null && !auditor.auditTask.isDone() ? 1 : 0) != 0);
        Assert.assertEquals((String)"REPLICAS_CHECK_TIME", (long)0L, (long)replicasCheckTime.getSuccessCount());
        auditor.close();
    }

    private BookieId replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
        int bookieIdx = -1;
        Long entryId = (Long)lh.getLedgerMetadata().getAllEnsembles().firstKey();
        List curEnsemble = (List)lh.getLedgerMetadata().getAllEnsembles().get(entryId);
        BookieId replacedBookie = null;
        for (int i = 0; i < this.numBookies; ++i) {
            if (!curEnsemble.contains(this.addressByIndex(i))) continue;
            bookieIdx = i;
            replacedBookie = this.addressByIndex(i);
            break;
        }
        Assert.assertNotEquals((String)"Couldn't find ensemble bookie in bookie list", (long)-1L, (long)bookieIdx);
        LOG.info("Killing bookie " + this.addressByIndex(bookieIdx));
        ServerConfiguration conf = this.killBookie(bookieIdx);
        TestBookieImpl writeFailingBookie = new TestBookieImpl(conf){

            public void addEntry(ByteBuf entry, boolean ackBeforeSync, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException {
                try {
                    LOG.info("Failing write to entry ");
                    Thread.sleep(100L);
                    throw new IOException();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        };
        this.startAndAddBookie(conf, (Bookie)writeFailingBookie);
        return replacedBookie;
    }

    @Test
    public void testFailedWriteRecovery() throws Exception {
        LedgerManagerFactory mFactory = this.driver.getLedgerManagerFactory();
        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
        underReplicationManager.disableLedgerReplication();
        LedgerHandle lh = this.bkc.createLedger(2, 2, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes());
        BookieId replacedBookie = this.replaceBookieWithWriteFailingBookie(lh);
        byte[] data = "foobar".getBytes();
        data = "foobar".getBytes();
        lh.addEntry(data);
        lh.addEntry(data);
        lh.addEntry(data);
        lh.close();
        underReplicationManager.enableLedgerReplication();
        long underReplicatedLedger = -1L;
        for (int i = 0; i < 5 && (underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate()) == -1L; ++i) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((String)"Ledger should be under replicated", (long)lh.getId(), (long)underReplicatedLedger);
        ArrayList<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
        for (int i = 0; i < this.numBookies; ++i) {
            ReplicationWorker rw = new ReplicationWorker(this.confByIndex(i), (StatsLogger)NullStatsLogger.INSTANCE);
            rw.start();
            l.add(rw);
        }
        underReplicationManager.close();
        Thread.sleep(3000L);
        for (ReplicationWorker rw : l) {
            rw.shutdown();
        }
        LedgerHandle newLh = this.bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, "passwd".getBytes());
        for (Map.Entry e : newLh.getLedgerMetadata().getAllEnsembles().entrySet()) {
            List ensemble = (List)e.getValue();
            Assert.assertFalse((String)"Ensemble hasn't been updated", (boolean)ensemble.contains(replacedBookie));
        }
        newLh.close();
    }

    static class TestAuditor
    extends Auditor {
        final AtomicReference<CountDownLatch> latchRef = new AtomicReference<CountDownLatch>(new CountDownLatch(1));

        public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, StatsLogger statsLogger, AtomicBoolean exceptedRun) throws ReplicationException.UnavailableException {
            super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
            this.renewAuditorTestWrapperTask(exceptedRun);
        }

        public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger statsLogger, AtomicBoolean exceptedRun) throws ReplicationException.UnavailableException {
            super(bookieIdentifier, conf, bkc, ownBkc, bkadmin, ownadmin, statsLogger);
            this.renewAuditorTestWrapperTask(exceptedRun);
        }

        public TestAuditor(String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger, AtomicBoolean exceptedRun) throws ReplicationException.UnavailableException {
            super(bookieIdentifier, conf, statsLogger);
            this.renewAuditorTestWrapperTask(exceptedRun);
        }

        private void renewAuditorTestWrapperTask(AtomicBoolean exceptedRun) {
            this.auditorCheckAllLedgersTask = new AuditorTestWrapperTask(this.auditorCheckAllLedgersTask, this.latchRef, exceptedRun);
            this.auditorPlacementPolicyCheckTask = new AuditorTestWrapperTask(this.auditorPlacementPolicyCheckTask, this.latchRef, exceptedRun);
            this.auditorReplicasCheckTask = new AuditorTestWrapperTask(this.auditorReplicasCheckTask, this.latchRef, exceptedRun);
        }

        CountDownLatch getLatch() {
            return this.latchRef.get();
        }

        void setLatch(CountDownLatch latch) {
            this.latchRef.set(latch);
        }

        private static class AuditorTestWrapperTask
        extends AuditorTask {
            private final AuditorTask innerTask;
            private final AtomicReference<CountDownLatch> latchRef;
            private final AtomicBoolean exceptedRun;

            AuditorTestWrapperTask(AuditorTask innerTask, AtomicReference<CountDownLatch> latchRef, AtomicBoolean exceptedRun) {
                super(null, null, null, null, null, null, null);
                this.innerTask = innerTask;
                this.latchRef = latchRef;
                this.exceptedRun = exceptedRun;
            }

            protected void runTask() {
                if (this.exceptedRun == null || this.exceptedRun.get()) {
                    this.innerTask.runTask();
                    this.latchRef.get().countDown();
                }
            }

            public void shutdown() {
                this.innerTask.shutdown();
            }
        }
    }
}

