package org.apache.bookkeeper.replication;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/replication/TestReplicationWorker.class */
public class TestReplicationWorker extends BookKeeperClusterTestCase {
    private String basePath;
    private String baseLockPath;
    private MetadataBookieDriver driver;
    private LedgerManagerFactory mFactory;
    private LedgerUnderreplicationManager underReplicationManager;
    private LedgerManager ledgerManager;
    private OrderedScheduler scheduler;
    private String zkLedgersRootPath;
    private static final byte[] TESTPASSWD = "testpasswd".getBytes();
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWorker.class);
    private static byte[] data = "TestReplicationWorker".getBytes();

    /* loaded from: input_file:org/apache/bookkeeper/replication/TestReplicationWorker$InjectedReplicationWorker.class */
    class InjectedReplicationWorker extends ReplicationWorker {
        CopyOnWriteArrayList<Long> delayReplicationPeriods;

        public InjectedReplicationWorker(ServerConfiguration serverConfiguration, StatsLogger statsLogger, CopyOnWriteArrayList<Long> copyOnWriteArrayList) throws ReplicationException.CompatibilityException, KeeperException, InterruptedException, IOException {
            super(serverConfiguration, statsLogger);
            this.delayReplicationPeriods = copyOnWriteArrayList;
        }

        void scheduleTaskWithDelay(TimerTask timerTask, long j) {
            this.delayReplicationPeriods.add(Long.valueOf(j));
            super.scheduleTaskWithDelay(timerTask, j);
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/replication/TestReplicationWorker$MockZooKeeperClient.class */
    class MockZooKeeperClient extends ZooKeeperClient {
        private final String connectString;
        private final int sessionTimeoutMs;
        private final ZooKeeperWatcherBase watcherManager;
        private volatile String pathOfSetDataToFail;
        private volatile String pathOfDeleteToFail;
        private AtomicInteger numOfTimesSetDataFailed;
        private AtomicInteger numOfTimesDeleteFailed;

        /* loaded from: input_file:org/apache/bookkeeper/replication/TestReplicationWorker$MockZooKeeperClient$MockZooKeeper.class */
        class MockZooKeeper extends ZooKeeper {
            public MockZooKeeper(String str, int i, Watcher watcher, boolean z) throws IOException {
                super(str, i, watcher, z);
            }

            public void setData(String str, byte[] bArr, int i, AsyncCallback.StatCallback statCallback, Object obj) {
                if (MockZooKeeperClient.this.pathOfSetDataToFail == null || !MockZooKeeperClient.this.pathOfSetDataToFail.equals(str)) {
                    super.setData(str, bArr, i, statCallback, obj);
                    return;
                }
                TestReplicationWorker.LOG.error("setData of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", str);
                MockZooKeeperClient.this.numOfTimesSetDataFailed.incrementAndGet();
                statCallback.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), str, obj, (Stat) null);
            }

            public void delete(String str, int i) throws KeeperException, InterruptedException {
                if (MockZooKeeperClient.this.pathOfDeleteToFail == null || !MockZooKeeperClient.this.pathOfDeleteToFail.equals(str)) {
                    super.delete(str, i);
                } else {
                    TestReplicationWorker.LOG.error("delete of MockZooKeeper, is failing with CONNECTIONLOSS for path: {}", str);
                    MockZooKeeperClient.this.numOfTimesDeleteFailed.incrementAndGet();
                    throw new KeeperException.ConnectionLossException();
                }
            }
        }

        MockZooKeeperClient(String str, int i, ZooKeeperWatcherBase zooKeeperWatcherBase) throws IOException {
            super(str, i, zooKeeperWatcherBase, new BoundExponentialBackoffRetryPolicy(i, i, Integer.MAX_VALUE), new BoundExponentialBackoffRetryPolicy(i, i, 0), NullStatsLogger.INSTANCE, 1, 0.0d, false);
            this.numOfTimesSetDataFailed = new AtomicInteger();
            this.numOfTimesDeleteFailed = new AtomicInteger();
            this.connectString = str;
            this.sessionTimeoutMs = i;
            this.watcherManager = zooKeeperWatcherBase;
        }

        protected ZooKeeper createZooKeeper() throws IOException {
            return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, this.watcherManager, false);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setPathOfSetDataToFail(String str) {
            this.pathOfSetDataToFail = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setPathOfDeleteToFail(String str) {
            this.pathOfDeleteToFail = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumOfTimesSetDataFailed() {
            return this.numOfTimesSetDataFailed.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumOfTimesDeleteFailed() {
            return this.numOfTimesDeleteFailed.get();
        }
    }

    public TestReplicationWorker() {
        this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
    }

    TestReplicationWorker(String str) {
        super(3);
        this.basePath = "";
        this.baseLockPath = "";
        LOG.info("Running test case using ledger manager : " + str);
        this.baseConf.setLedgerManagerFactoryClassName(str);
        this.baseClientConf.setLedgerManagerFactoryClassName(str);
        this.baseConf.setRereplicationEntryBatchSize(3L);
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void setUp() throws Exception {
        super.setUp();
        this.zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(this.baseClientConf);
        this.basePath = this.zkLedgersRootPath + "/underreplication/ledgers";
        this.baseLockPath = this.zkLedgersRootPath + "/underreplication/locks";
        this.scheduler = OrderedScheduler.newSchedulerBuilder().name("test-scheduler").numThreads(1).build();
        this.driver = MetadataDrivers.getBookieDriver(URI.create(this.baseConf.getMetadataServiceUri()));
        this.driver.initialize(this.baseConf, () -> {
        }, NullStatsLogger.INSTANCE);
        this.mFactory = this.driver.getLedgerManagerFactory();
        this.ledgerManager = this.mFactory.newLedgerManager();
        this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void tearDown() throws Exception {
        super.tearDown();
        if (null != this.underReplicationManager) {
            this.underReplicationManager.close();
            this.underReplicationManager = null;
        }
        if (null != this.driver) {
            this.driver.close();
        }
    }

    @Test
    public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        killBookie(bookieSocketAddress);
        BookieSocketAddress startNewBookieAndReturnAddress = startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress);
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger.addEntry(data);
        }
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        replicationWorker.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            killAllBookies(createLedger, startNewBookieAndReturnAddress);
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test
    public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        ServerConfiguration killBookie = killBookie(bookieSocketAddress);
        BookieSocketAddress startNewBookieAndReturnAddress = startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr :" + startNewBookieAndReturnAddress);
        killAllBookies(createLedger, startNewBookieAndReturnAddress);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        replicationWorker.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
            int i2 = 30;
            while (true) {
                int i3 = i2;
                i2--;
                if (i3 <= 0) {
                    break;
                }
                Assert.assertTrue("Expecting that replication should not complete", ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(startBookie(killBookie));
            this.bsConfs.add(killBookie);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test
    public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        ServerConfiguration killBookie = killBookie(bookieSocketAddress);
        killAllBookies(createLedger, null);
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress());
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress());
        ReplicationWorker replicationWorker2 = new ReplicationWorker(this.baseConf);
        replicationWorker.start();
        replicationWorker2.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
            int i2 = 10;
            while (true) {
                int i3 = i2;
                i2--;
                if (i3 <= 0) {
                    break;
                }
                Assert.assertTrue("Expecting that replication should not complete", ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(startBookie(killBookie));
            this.bsConfs.add(killBookie);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            replicationWorker.shutdown();
            replicationWorker2.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            replicationWorker2.shutdown();
            throw th;
        }
    }

    @Test
    public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        killBookie(bookieSocketAddress);
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress());
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        replicationWorker.start();
        try {
            this.bkc.deleteLedger(createLedger.getId());
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
        } finally {
            replicationWorker.shutdown();
        }
    }

    @Test
    public void testMultipleLedgerReplicationWithReplicationWorker() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        LedgerHandle createLedger2 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger2.addEntry(data);
        }
        BookieSocketAddress bookieSocketAddress2 = (BookieSocketAddress) ((List) createLedger2.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress2);
        killBookie(bookieSocketAddress);
        createLedger.close();
        killBookie(bookieSocketAddress);
        createLedger2.close();
        BookieSocketAddress startNewBookieAndReturnAddress = startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        replicationWorker.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
            this.underReplicationManager.markLedgerUnderreplicated(createLedger2.getId(), bookieSocketAddress2.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger2.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            killAllBookies(createLedger, startNewBookieAndReturnAddress);
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            verifyRecoveredLedgers(createLedger2, 0L, 9L);
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        killBookie(bookieSocketAddress);
        BookieSocketAddress startNewBookieAndReturnAddress = startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress);
        this.baseConf.setOpenLedgerRereplicationGracePeriod("3000");
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver(URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, this.scheduler, NullStatsLogger.INSTANCE, Optional.empty());
            LedgerUnderreplicationManager newLedgerUnderreplicationManager = clientDriver.getLedgerManagerFactory().newLedgerUnderreplicationManager();
            replicationWorker.start();
            try {
                newLedgerUnderreplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                killAllBookies(createLedger, startNewBookieAndReturnAddress);
                verifyRecoveredLedgers(createLedger, 0L, 9L);
                Assert.assertFalse("Ledger must have been closed by RW", ClientUtil.isLedgerOpen(this.bkc.openLedgerNoRecovery(createLedger.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD)));
                replicationWorker.shutdown();
                newLedgerUnderreplicationManager.close();
            } catch (Throwable th) {
                replicationWorker.shutdown();
                newLedgerUnderreplicationManager.close();
                throw th;
            }
        } finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    @Test
    public void testBookiesNotAvailableScenarioForReplicationWorker() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 7; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        BookieSocketAddress[] bookieSocketAddressArr = new BookieSocketAddress[3];
        final ServerConfiguration[] serverConfigurationArr = new ServerConfiguration[3];
        for (int i2 = 0; i2 < 3; i2++) {
            bookieSocketAddressArr[i2] = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(i2);
            serverConfigurationArr[i2] = getBkConf(bookieSocketAddressArr[i2]);
            LOG.info("Killing Bookie : {}", bookieSocketAddressArr[i2]);
            killBookie(bookieSocketAddressArr[i2]);
        }
        for (int i3 = 0; i3 < 3; i3++) {
            startNewBookieAndReturnAddress();
        }
        ServerConfiguration serverConfiguration = new ServerConfiguration(this.baseConf);
        serverConfiguration.setLockReleaseOfFailedLedgerGracePeriod("64");
        ReplicationWorker replicationWorker = new ReplicationWorker(serverConfiguration);
        ReplicationWorker replicationWorker2 = new ReplicationWorker(serverConfiguration);
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver(URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, this.scheduler, NullStatsLogger.INSTANCE, Optional.empty());
            LedgerUnderreplicationManager newLedgerUnderreplicationManager = clientDriver.getLedgerManagerFactory().newLedgerUnderreplicationManager();
            for (BookieSocketAddress bookieSocketAddress : bookieSocketAddressArr) {
                try {
                    newLedgerUnderreplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
                } catch (Throwable th) {
                    replicationWorker.shutdown();
                    replicationWorker2.shutdown();
                    newLedgerUnderreplicationManager.close();
                    throw th;
                }
            }
            while (!ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            replicationWorker.start();
            replicationWorker2.start();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            new Thread(new Runnable() { // from class: org.apache.bookkeeper.replication.TestReplicationWorker.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(3000L);
                        atomicBoolean.set(true);
                        TestReplicationWorker.this.startBookie(serverConfigurationArr[0]);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            int i4 = 0;
            int i5 = 0;
            while (!atomicBoolean.get()) {
                Assert.assertTrue("Ledger: " + createLedger.getId() + " should be underreplicated", ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath));
                int i6 = ((AtomicInteger) replicationWorker.replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get();
                Assert.assertTrue("The current number of failed attempts: " + i6 + " should be greater than or equal to previous value: " + i4, i6 >= i4);
                i4 = i6;
                int i7 = ((AtomicInteger) replicationWorker2.replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get();
                Assert.assertTrue("The current number of failed attempts: " + i7 + " should be greater than or equal to previous value: " + i5, i7 >= i5);
                i5 = i7;
                Thread.sleep(50L);
            }
            int i8 = 0;
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
                i8 += 100;
                if (i8 == 20000) {
                    Assert.fail("Ledger should be replicated by now");
                }
            }
            int i9 = ((AtomicInteger) replicationWorker.replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get();
            int i10 = ((AtomicInteger) replicationWorker2.replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get();
            Thread.sleep(2000L);
            Assert.assertEquals("rw1 failedattempts", i9, ((AtomicInteger) replicationWorker.replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get());
            Assert.assertEquals("rw2 failed attempts ", i10, ((AtomicInteger) replicationWorker2.replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get());
            int size = ((ConcurrentSkipListSet) replicationWorker.unableToReadEntriesForReplication.get(Long.valueOf(createLedger.getId()))).size();
            int size2 = ((ConcurrentSkipListSet) replicationWorker2.unableToReadEntriesForReplication.get(Long.valueOf(createLedger.getId()))).size();
            Assert.assertTrue("unableToReadEntriesForReplication in RW1: " + size + " in RW2: " + size2, size == 0 || size2 == 0);
            replicationWorker.shutdown();
            replicationWorker2.shutdown();
            newLedgerUnderreplicationManager.close();
        } finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    @Test
    public void testDeferLedgerLockReleaseForReplicationWorker() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 7; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        BookieSocketAddress[] bookieSocketAddressArr = new BookieSocketAddress[3];
        ServerConfiguration[] serverConfigurationArr = new ServerConfiguration[3];
        for (int i2 = 0; i2 < 3; i2++) {
            bookieSocketAddressArr[i2] = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(i2);
            serverConfigurationArr[i2] = getBkConf(bookieSocketAddressArr[i2]);
            LOG.info("Killing Bookie : {}", bookieSocketAddressArr[i2]);
            killBookie(bookieSocketAddressArr[i2]);
        }
        for (int i3 = 0; i3 < 3; i3++) {
            startNewBookieAndReturnAddress();
        }
        long pow = 64 / ((int) Math.pow(2.0d, 5.0d));
        ServerConfiguration serverConfiguration = new ServerConfiguration(this.baseConf);
        serverConfiguration.setLockReleaseOfFailedLedgerGracePeriod(Long.toString(64L));
        serverConfiguration.setRereplicationEntryBatchSize(1000L);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        TestStatsProvider testStatsProvider = new TestStatsProvider();
        TestStatsProvider.TestStatsLogger statsLogger = testStatsProvider.getStatsLogger("rw1");
        TestStatsProvider.TestStatsLogger statsLogger2 = testStatsProvider.getStatsLogger("rw2");
        InjectedReplicationWorker injectedReplicationWorker = new InjectedReplicationWorker(serverConfiguration, statsLogger, copyOnWriteArrayList);
        InjectedReplicationWorker injectedReplicationWorker2 = new InjectedReplicationWorker(serverConfiguration, statsLogger2, copyOnWriteArrayList2);
        Counter counter = statsLogger.getCounter("NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION");
        Counter counter2 = statsLogger2.getCounter("NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION");
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver(URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, this.scheduler, NullStatsLogger.INSTANCE, Optional.empty());
            LedgerUnderreplicationManager newLedgerUnderreplicationManager = clientDriver.getLedgerManagerFactory().newLedgerUnderreplicationManager();
            for (BookieSocketAddress bookieSocketAddress : bookieSocketAddressArr) {
                try {
                    newLedgerUnderreplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
                } catch (Throwable th) {
                    injectedReplicationWorker.shutdown();
                    injectedReplicationWorker2.shutdown();
                    newLedgerUnderreplicationManager.close();
                    throw th;
                }
            }
            while (!ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            injectedReplicationWorker.start();
            injectedReplicationWorker2.start();
            while (true) {
                if (((AtomicInteger) ((ReplicationWorker) injectedReplicationWorker).replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get() >= 10 && ((AtomicInteger) ((ReplicationWorker) injectedReplicationWorker2).replicationFailedLedgers.get(Long.valueOf(createLedger.getId()))).get() >= 10) {
                    break;
                } else {
                    Thread.sleep(500L);
                }
            }
            Assert.assertTrue("Ledger: " + createLedger.getId() + " should be underreplicated", ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath));
            for (int i4 = 0; i4 < 10 - 1; i4++) {
                long min = Math.min(64L, pow * (1 << i4));
                Assert.assertEquals("RW1 delayperiod", Long.valueOf(min), copyOnWriteArrayList.get(i4));
                Assert.assertEquals("RW2 delayperiod", Long.valueOf(min), copyOnWriteArrayList2.get(i4));
            }
            Assert.assertEquals("numEntriesUnableToReadForReplication for RW1", Long.valueOf(7), counter.get());
            Assert.assertEquals("numEntriesUnableToReadForReplication for RW2", Long.valueOf(7), counter2.get());
            Assert.assertEquals("RW1 unabletoreadentries", 7, ((ConcurrentSkipListSet) ((ReplicationWorker) injectedReplicationWorker).unableToReadEntriesForReplication.get(Long.valueOf(createLedger.getId()))).size());
            Assert.assertEquals("RW2 unabletoreadentries", 7, ((ConcurrentSkipListSet) ((ReplicationWorker) injectedReplicationWorker2).unableToReadEntriesForReplication.get(Long.valueOf(createLedger.getId()))).size());
            injectedReplicationWorker.shutdown();
            injectedReplicationWorker2.shutdown();
            newLedgerUnderreplicationManager.close();
        } finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    @Test
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedger.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie : {}", bookieSocketAddress);
        killBookie(bookieSocketAddress);
        BookieSocketAddress startNewBookieAndReturnAddress = startNewBookieAndReturnAddress();
        LOG.info("New Bookie addr : {}", startNewBookieAndReturnAddress);
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger.addEntry(data);
        }
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
        this.baseClientConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver(URI.create(this.baseClientConf.getMetadataServiceUri()));
        try {
            clientDriver.initialize(this.baseClientConf, this.scheduler, NullStatsLogger.INSTANCE, Optional.empty());
            LedgerUnderreplicationManager newLedgerUnderreplicationManager = clientDriver.getLedgerManagerFactory().newLedgerUnderreplicationManager();
            replicationWorker.start();
            try {
                newLedgerUnderreplicationManager.markLedgerUnderreplicated(createLedger.getId(), bookieSocketAddress.toString());
                while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                    Thread.sleep(100L);
                }
                killAllBookies(createLedger, startNewBookieAndReturnAddress);
                verifyRecoveredLedgers(createLedger, 0L, 9L);
                Assert.assertTrue("Ledger must have been closed by RW", ClientUtil.isLedgerOpen(this.bkc.openLedgerNoRecovery(createLedger.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD)));
                replicationWorker.shutdown();
                newLedgerUnderreplicationManager.close();
            } catch (Throwable th) {
                replicationWorker.shutdown();
                newLedgerUnderreplicationManager.close();
                throw th;
            }
        } finally {
            if (Collections.singletonList(clientDriver).get(0) != null) {
                clientDriver.close();
            }
        }
    }

    @Test
    public void testRWZKConnectionLost() throws Exception {
        ZooKeeperClient build = ZooKeeperClient.newBuilder().connectString(this.zkUtil.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();
        Throwable th = null;
        try {
            ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf);
            replicationWorker.start();
            for (int i = 0; i < 10 && !replicationWorker.isRunning(); i++) {
                Thread.sleep(1000L);
            }
            Assert.assertTrue("Replication worker should be running", replicationWorker.isRunning());
            stopZKCluster();
            for (int i2 = 0; i2 < 10 && build.getState().isConnected(); i2++) {
                Thread.sleep(1000L);
            }
            Assert.assertFalse(build.getState().isConnected());
            startZKCluster();
            Assert.assertTrue("Replication worker should still be running", replicationWorker.isRunning());
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    private void killAllBookies(LedgerHandle ledgerHandle, BookieSocketAddress bookieSocketAddress) throws Exception {
        Iterator it = ledgerHandle.getLedgerMetadata().getAllEnsembles().entrySet().iterator();
        while (it.hasNext()) {
            for (BookieSocketAddress bookieSocketAddress2 : (List) ((Map.Entry) it.next()).getValue()) {
                if (!bookieSocketAddress2.equals(bookieSocketAddress)) {
                    killBookie(bookieSocketAddress2);
                }
            }
        }
    }

    private void verifyRecoveredLedgers(LedgerHandle ledgerHandle, long j, long j2) throws BKException, InterruptedException {
        Enumeration readEntries = this.bkc.openLedgerNoRecovery(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD).readEntries(j, j2);
        Assert.assertTrue("Should have the elements", readEntries.hasMoreElements());
        while (readEntries.hasMoreElements()) {
            Assert.assertEquals("TestReplicationWorker", new String(((LedgerEntry) readEntries.nextElement()).getEntry()));
        }
    }

    @Test
    public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
        ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(10000, NullStatsLogger.INSTANCE);
        MockZooKeeperClient mockZooKeeperClient = new MockZooKeeperClient(this.zkUtil.getZooKeeperConnectString(), 10000, zooKeeperWatcherBase);
        mockZooKeeperClient.waitForConnection();
        Assert.assertEquals("zkFaultInjectionWrapper should be in connected state", ZooKeeper.States.CONNECTED, mockZooKeeperClient.getState());
        long sessionId = mockZooKeeperClient.getSessionId();
        BookKeeper bookKeeper = new BookKeeper(this.baseClientConf, mockZooKeeperClient);
        LedgerHandle createLedgerAdv = bookKeeper.createLedgerAdv(567L, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD, (Map) null);
        for (int i = 0; i < 10; i++) {
            createLedgerAdv.addEntry(i, data);
        }
        createLedgerAdv.close();
        zooKeeperWatcherBase.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, ""));
        mockZooKeeperClient.waitForConnection();
        for (int i2 = 0; i2 < 10 && mockZooKeeperClient.getState() != ZooKeeper.States.CONNECTED; i2++) {
            Thread.sleep(200L);
        }
        Assert.assertEquals("zkFaultInjectionWrapper should be in connected state", ZooKeeper.States.CONNECTED, mockZooKeeperClient.getState());
        Assert.assertNotEquals("Session Id of old and new ZK instance should be different", sessionId, mockZooKeeperClient.getSessionId());
        BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) ((List) createLedgerAdv.getLedgerMetadata().getAllEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", bookieSocketAddress);
        killBookie(bookieSocketAddress);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.baseConf, bookKeeper, false, NullStatsLogger.INSTANCE);
        replicationWorker.start();
        for (int i3 = 0; i3 < 40; i3++) {
            try {
                if (replicationWorker.isRunning()) {
                    break;
                }
                LOG.info("Waiting for the RW to start...");
                Thread.sleep(500L);
            } catch (Throwable th) {
                replicationWorker.shutdown();
                mockZooKeeperClient.close();
                bookKeeper.close();
                throw th;
            }
        }
        Assert.assertTrue("RW should be running", replicationWorker.isRunning());
        String ledgerPath = this.ledgerManager.getLedgerPath(567L);
        String urLedgerLockZnode = ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(this.zkLedgersRootPath), 567L);
        mockZooKeeperClient.setPathOfSetDataToFail(ledgerPath);
        mockZooKeeperClient.setPathOfDeleteToFail(urLedgerLockZnode);
        this.underReplicationManager.markLedgerUnderreplicated(createLedgerAdv.getId(), bookieSocketAddress.toString());
        for (int i4 = 0; i4 < 40 && replicationWorker.isRunning(); i4++) {
            LOG.info("Waiting for the RW to shutdown...");
            Thread.sleep(500L);
        }
        Assert.assertEquals("NumOfTimesSetDataFailed", 1L, mockZooKeeperClient.getNumOfTimesSetDataFailed());
        Assert.assertEquals("NumOfTimesDeleteFailed", 2L, mockZooKeeperClient.getNumOfTimesDeleteFailed());
        Assert.assertFalse("RW should be shutdown", replicationWorker.isRunning());
        replicationWorker.shutdown();
        mockZooKeeperClient.close();
        bookKeeper.close();
    }
}
