package org.apache.bookkeeper.replication;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerHandleAdapter;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/replication/TestReplicationWorker.class */
public class TestReplicationWorker extends MultiLedgerManagerTestCase {
    private String basePath;
    private LedgerManagerFactory mFactory;
    private LedgerUnderreplicationManager underReplicationManager;
    private static final byte[] TESTPASSWD = "testpasswd".getBytes();
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWorker.class);
    private static byte[] data = "TestReplicationWorker".getBytes();

    public TestReplicationWorker(String str) {
        super(3);
        this.basePath = "";
        LOG.info("Running test case using ledger manager : " + str);
        this.baseConf.setLedgerManagerFactoryClassName(str);
        this.baseClientConf.setLedgerManagerFactoryClassName(str);
        this.basePath = this.baseClientConf.getZkLedgersRootPath() + "/underreplication/ledgers";
        this.baseConf.setRereplicationEntryBatchSize(3L);
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void setUp() throws Exception {
        super.setUp();
        this.mFactory = LedgerManagerFactory.newLedgerManagerFactory(this.baseClientConf, this.zkc);
        this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void tearDown() throws Exception {
        super.tearDown();
        if (null != this.mFactory) {
            this.mFactory.uninitialize();
            this.mFactory = null;
        }
        if (null != this.underReplicationManager) {
            this.underReplicationManager.close();
            this.underReplicationManager = null;
        }
    }

    @Test(timeout = 30000)
    public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        killBookie(inetSocketAddress);
        int startNewBookie = startNewBookie();
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie);
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        replicationWorker.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            killAllBookies(createLedger, inetSocketAddress2);
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        ServerConfiguration killBookie = killBookie(inetSocketAddress);
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie());
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        killAllBookies(createLedger, inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        replicationWorker.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            int i2 = 100;
            while (true) {
                int i3 = i2;
                i2--;
                if (i3 <= 0) {
                    break;
                }
                assertTrue("Expecting that replication should not complete", ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(startBookie(killBookie));
            this.bsConfs.add(killBookie);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test(timeout = 90000)
    public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplication() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        ServerConfiguration killBookie = killBookie(inetSocketAddress);
        killAllBookies(createLedger, null);
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie());
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        InetSocketAddress inetSocketAddress3 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie());
        LOG.info("New Bookie addr :" + inetSocketAddress3);
        ZooKeeper createConnectedZookeeperClient = ZkUtils.createConnectedZookeeperClient(this.zkUtil.getZooKeeperConnectString(), new ZooKeeperWatcherBase(10000));
        ReplicationWorker replicationWorker2 = new ReplicationWorker(createConnectedZookeeperClient, this.baseConf, inetSocketAddress3);
        replicationWorker.start();
        replicationWorker2.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            int i2 = 10;
            while (true) {
                int i3 = i2;
                i2--;
                if (i3 <= 0) {
                    break;
                }
                assertTrue("Expecting that replication should not complete", ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath));
                Thread.sleep(100L);
            }
            this.bs.add(startBookie(killBookie));
            this.bsConfs.add(killBookie);
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            replicationWorker.shutdown();
            replicationWorker2.shutdown();
            createConnectedZookeeperClient.close();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            replicationWorker2.shutdown();
            createConnectedZookeeperClient.close();
            throw th;
        }
    }

    @Test(timeout = 3000)
    public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        createLedger.close();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        killBookie(inetSocketAddress);
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie());
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        replicationWorker.start();
        try {
            this.bkc.deleteLedger(createLedger.getId());
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
        } finally {
            replicationWorker.shutdown();
        }
    }

    @Test(timeout = 60000)
    public void testMultipleLedgerReplicationWithReplicationWorker() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        LedgerHandle createLedger2 = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger2.addEntry(data);
        }
        InetSocketAddress inetSocketAddress2 = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger2).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress2);
        killBookie(inetSocketAddress);
        createLedger.close();
        killBookie(inetSocketAddress);
        createLedger2.close();
        InetSocketAddress inetSocketAddress3 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie());
        LOG.info("New Bookie addr :" + inetSocketAddress3);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress3);
        replicationWorker.start();
        try {
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            this.underReplicationManager.markLedgerUnderreplicated(createLedger2.getId(), inetSocketAddress2.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger2.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            killAllBookies(createLedger, inetSocketAddress3);
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            verifyRecoveredLedgers(createLedger2, 0L, 9L);
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        killBookie(inetSocketAddress);
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie());
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        LedgerUnderreplicationManager newLedgerUnderreplicationManager = LedgerManagerFactory.newLedgerManagerFactory(this.baseClientConf, this.zkc).newLedgerUnderreplicationManager();
        replicationWorker.start();
        try {
            newLedgerUnderreplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            killAllBookies(createLedger, inetSocketAddress2);
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            assertFalse("Ledger must have been closed by RW", ClientUtil.isLedgerOpen(this.bkc.openLedgerNoRecovery(createLedger.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD)));
            replicationWorker.shutdown();
            newLedgerUnderreplicationManager.close();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            newLedgerUnderreplicationManager.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        killBookie(inetSocketAddress);
        int startNewBookie = startNewBookie();
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie);
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        LedgerUnderreplicationManager newLedgerUnderreplicationManager = LedgerManagerFactory.newLedgerManagerFactory(this.baseClientConf, this.zkc).newLedgerUnderreplicationManager();
        replicationWorker.start();
        try {
            newLedgerUnderreplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath)) {
                Thread.sleep(100L);
            }
            killAllBookies(createLedger, inetSocketAddress2);
            verifyRecoveredLedgers(createLedger, 0L, 9L);
            assertTrue("Ledger must have been closed by RW", ClientUtil.isLedgerOpen(this.bkc.openLedgerNoRecovery(createLedger.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD)));
            replicationWorker.shutdown();
            newLedgerUnderreplicationManager.close();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            newLedgerUnderreplicationManager.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testRWShutdownOnLocalBookieReadonlyTransition() throws Exception {
        LedgerHandle createLedger = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ((ArrayList) LedgerHandleAdapter.getLedgerMetadata(createLedger).getEnsembles().get(0L)).get(0);
        LOG.info("Killing Bookie", inetSocketAddress);
        killBookie(inetSocketAddress);
        int startNewBookie = startNewBookie();
        for (int i2 = 0; i2 < 10; i2++) {
            createLedger.addEntry(data);
        }
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), startNewBookie);
        LOG.info("New Bookie addr :" + inetSocketAddress2);
        ReplicationWorker replicationWorker = new ReplicationWorker(this.zkc, this.baseConf, inetSocketAddress2);
        replicationWorker.start();
        try {
            BookieServer bookieServer = this.bs.get(this.bs.size() - 1);
            this.bsConfs.get(this.bsConfs.size() - 1).setReadOnlyModeEnabled(true);
            bookieServer.getBookie().transitionToReadOnlyMode();
            this.underReplicationManager.markLedgerUnderreplicated(createLedger.getId(), inetSocketAddress.toString());
            while (ReplicationTestUtil.isLedgerInUnderReplication(this.zkc, createLedger.getId(), this.basePath) && replicationWorker.isRunning()) {
                Thread.sleep(100L);
            }
            assertFalse("RW should shutdown if the bookie is readonly", replicationWorker.isRunning());
            replicationWorker.shutdown();
        } catch (Throwable th) {
            replicationWorker.shutdown();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testRWZKSessionLost() throws Exception {
        ZooKeeper createConnectedZookeeperClient = ZkUtils.createConnectedZookeeperClient(this.zkUtil.getZooKeeperConnectString(), new ZooKeeperWatcherBase(10000));
        try {
            ReplicationWorker replicationWorker = new ReplicationWorker(createConnectedZookeeperClient, this.baseConf, getBookie(0));
            replicationWorker.start();
            for (int i = 0; i < 10 && !replicationWorker.isRunning(); i++) {
                Thread.sleep(1000L);
            }
            assertTrue("Replication worker should be running", replicationWorker.isRunning());
            stopZKCluster();
            for (int i2 = 0; i2 < 10 && replicationWorker.isRunning(); i2++) {
                Thread.sleep(1000L);
            }
            assertFalse("Replication worker should have shut down", replicationWorker.isRunning());
            createConnectedZookeeperClient.close();
        } catch (Throwable th) {
            createConnectedZookeeperClient.close();
            throw th;
        }
    }

    private void killAllBookies(LedgerHandle ledgerHandle, InetSocketAddress inetSocketAddress) throws Exception {
        Iterator it = LedgerHandleAdapter.getLedgerMetadata(ledgerHandle).getEnsembles().entrySet().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((ArrayList) ((Map.Entry) it.next()).getValue()).iterator();
            while (it2.hasNext()) {
                InetSocketAddress inetSocketAddress2 = (InetSocketAddress) it2.next();
                if (!inetSocketAddress2.equals(inetSocketAddress)) {
                    killBookie(inetSocketAddress2);
                }
            }
        }
    }

    private void verifyRecoveredLedgers(LedgerHandle ledgerHandle, long j, long j2) throws BKException, InterruptedException {
        Enumeration readEntries = this.bkc.openLedgerNoRecovery(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD).readEntries(j, j2);
        assertTrue("Should have the elements", readEntries.hasMoreElements());
        while (readEntries.hasMoreElements()) {
            assertEquals("TestReplicationWorker", new String(((LedgerEntry) readEntries.nextElement()).getEntry()));
        }
    }
}
