/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDelayEnsembleChange
extends BookKeeperClusterTestCase {
    private static final Logger logger = LoggerFactory.getLogger(TestDelayEnsembleChange.class);
    final BookKeeper.DigestType digestType;
    final byte[] testPasswd = "".getBytes();

    public TestDelayEnsembleChange() {
        super(5);
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    @Override
    @Before
    public void setUp() throws Exception {
        this.baseClientConf.setDelayEnsembleChange(true);
        super.setUp();
    }

    private void verifyEntries(LedgerHandle lh, long startEntry, long untilEntry, long expectedSuccess, long expectedMissing) throws Exception {
        LedgerMetadata md = lh.getLedgerMetadata();
        for (long eid = startEntry; eid < untilEntry; ++eid) {
            List addresses = md.getEnsembleAt(eid);
            VerificationCallback callback = new VerificationCallback(addresses.size());
            for (BookieId addr : addresses) {
                this.bkc.getBookieClient().readEntry(addr, lh.getId(), eid, (BookkeeperInternalCallbacks.ReadEntryCallback)callback, (Object)addr, 0, null);
            }
            callback.latch.await();
            Assert.assertEquals((long)expectedSuccess, (long)callback.numSuccess.get());
            Assert.assertEquals((long)expectedMissing, (long)callback.numMissing.get());
            Assert.assertEquals((long)0L, (long)callback.numFailure.get());
        }
    }

    private void verifyEntriesRange(LedgerHandle lh, long startEntry, long untilEntry, long expectedSuccess, long expectedMissing) throws Exception {
        LedgerMetadata md = lh.getLedgerMetadata();
        for (long eid = startEntry; eid < untilEntry; ++eid) {
            List addresses = md.getEnsembleAt(eid);
            VerificationCallback callback = new VerificationCallback(addresses.size());
            for (BookieId addr : addresses) {
                this.bkc.getBookieClient().readEntry(addr, lh.getId(), eid, (BookkeeperInternalCallbacks.ReadEntryCallback)callback, (Object)addr, 0, null);
            }
            callback.latch.await();
            Assert.assertTrue((expectedSuccess >= callback.numSuccess.get() ? 1 : 0) != 0);
            Assert.assertTrue((expectedMissing <= callback.numMissing.get() ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)callback.numFailure.get());
        }
    }

    @Test
    public void testNotChangeEnsembleIfNotBrokenAckQuorum() throws Exception {
        int i;
        LedgerHandle lh = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] data = "foobar".getBytes();
        int numEntries = 10;
        for (int i2 = 0; i2 < numEntries; ++i2) {
            lh.addEntry(data);
        }
        ServerConfiguration conf0 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(0));
        ServerConfiguration conf1 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(1));
        for (i = numEntries; i < 2 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be no ensemble change if delaying ensemble change is enabled.", (long)1L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.startAndAddBookie(conf0);
        this.startAndAddBookie(conf1);
        for (i = 2 * numEntries; i < 3 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be no ensemble change if delaying ensemble change is enabled.", (long)1L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.verifyEntries(lh, 0L, numEntries, 5L, 0L);
        this.verifyEntries(lh, numEntries, 2 * numEntries, 3L, 2L);
        this.verifyEntries(lh, 2 * numEntries, 3 * numEntries, 5L, 0L);
    }

    @Test
    public void testChangeEnsembleIfBrokenAckQuorum() throws Exception {
        this.startNewBookie();
        this.startNewBookie();
        this.startNewBookie();
        this.bkc.getTestStatsProvider().clear();
        LedgerHandle lh = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] data = "foobar".getBytes();
        int numEntries = 5;
        for (int i = 0; i < numEntries; ++i) {
            lh.addEntry(data);
        }
        for (BookieId addr : (List)lh.getLedgerMetadata().getAllEnsembles().get(0L)) {
            StringBuilder nameBuilder = new StringBuilder("bookkeeper_client");
            nameBuilder.append('.').append("bookie_").append(TestUtils.buildStatsCounterPathFromBookieID(addr)).append('.').append("LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION");
            Assert.assertTrue((String)("LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION should be > 0 for " + addr), (this.bkc.getTestStatsProvider().getCounter(nameBuilder.toString()).get() > 0L ? 1 : 0) != 0);
        }
        Assert.assertTrue((String)"Stats should have captured a new ensemble", (this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.NEW_ENSEMBLE_TIME").getSuccessCount() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((String)"Stats should not have captured an ensemble change", (this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.REPLACE_BOOKIE_TIME").getSuccessCount() == 0L ? 1 : 0) != 0);
        logger.info("Kill bookie 0 and write {} entries.", (Object)numEntries);
        ServerConfiguration conf0 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(0));
        for (int i = numEntries; i < 2 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be no ensemble change if delaying ensemble change is enabled.", (long)1L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        Assert.assertTrue((String)"Stats should not have captured an ensemble change", (this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.REPLACE_BOOKIE_TIME").getSuccessCount() == 0L ? 1 : 0) != 0);
        logger.info("Kill bookie 1 and write another {} entries.", (Object)numEntries);
        ServerConfiguration conf1 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(1));
        for (int i = 2 * numEntries; i < 3 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be no ensemble change if delaying ensemble change is enabled.", (long)1L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        logger.info("Kill bookie 2 and write another {} entries.", (Object)numEntries);
        ServerConfiguration conf2 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(2));
        for (int i = 3 * numEntries; i < 4 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be ensemble change if ack quorum couldn't be formed.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        Assert.assertTrue((String)"Stats should have captured an ensemble change", (this.bkc.getTestStatsProvider().getOpStatsLogger("bookkeeper_client.bookie_watcher.REPLACE_BOOKIE_TIME").getSuccessCount() > 0L ? 1 : 0) != 0);
        List firstFragment = lh.getLedgerMetadata().getEnsembleAt(0L);
        List secondFragment = lh.getLedgerMetadata().getEnsembleAt((long)(3 * numEntries));
        Assert.assertFalse((boolean)((BookieId)firstFragment.get(0)).equals(secondFragment.get(0)));
        Assert.assertFalse((boolean)((BookieId)firstFragment.get(1)).equals(secondFragment.get(1)));
        Assert.assertFalse((boolean)((BookieId)firstFragment.get(2)).equals(secondFragment.get(2)));
        Assert.assertEquals(firstFragment.get(3), secondFragment.get(3));
        Assert.assertEquals(firstFragment.get(4), secondFragment.get(4));
        this.startAndAddBookie(conf0);
        this.startAndAddBookie(conf1);
        this.startAndAddBookie(conf2);
        for (int i = 4 * numEntries; i < 5 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be no ensemble change if delaying ensemble change is enabled.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.verifyEntries(lh, 0L, numEntries, 5L, 0L);
        this.verifyEntries(lh, numEntries, 2 * numEntries, 4L, 1L);
        this.verifyEntries(lh, 2 * numEntries, 3 * numEntries, 3L, 2L);
        this.verifyEntries(lh, 3 * numEntries, 4 * numEntries, 5L, 0L);
        this.verifyEntries(lh, 4 * numEntries, 5 * numEntries, 5L, 0L);
    }

    @Test
    public void testEnsembleChangeWithNotEnoughBookies() throws Exception {
        int i;
        this.startNewBookie();
        LedgerHandle lh = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] data = "foobar".getBytes();
        int numEntries = 10;
        for (int i2 = 0; i2 < numEntries; ++i2) {
            lh.addEntry(data);
        }
        logger.info("Killed 3 bookies and add {} more entries : {}", (Object)numEntries, (Object)lh.getLedgerMetadata());
        ServerConfiguration conf0 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(0));
        ServerConfiguration conf1 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(1));
        ServerConfiguration conf2 = this.killBookie((BookieId)lh.getCurrentEnsemble().get(2));
        for (i = numEntries; i < 2 * numEntries; ++i) {
            lh.addEntry(data);
        }
        logger.info("Ledger metadata after killed bookies : {}", (Object)lh.getLedgerMetadata());
        Assert.assertEquals((String)"There should be ensemble change if ack quorum is broken.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.startAndAddBookie(conf0);
        this.startAndAddBookie(conf1);
        this.startAndAddBookie(conf2);
        for (i = 2 * numEntries; i < 3 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be no ensemble change after adding failed bookies back.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.verifyEntries(lh, 0L, numEntries, 5L, 0L);
        this.verifyEntries(lh, numEntries, 2 * numEntries, 3L, 2L);
        this.verifyEntries(lh, 2 * numEntries, 3 * numEntries, 5L, 0L);
    }

    @Test
    public void testEnsembleChangeWithMoreBookieFailures() throws Exception {
        int i;
        for (int i2 = 0; i2 < 5; ++i2) {
            this.startNewBookie();
        }
        LedgerHandle lh = this.bkc.createLedger(5, 5, 3, this.digestType, this.testPasswd);
        byte[] data = "foobar".getBytes();
        int numEntries = 10;
        for (int i3 = 0; i3 < numEntries; ++i3) {
            logger.info("Add entry {}", (Object)i3);
            lh.addEntry(data);
        }
        logger.info("Killed 5 bookies and add {} more entries : {}", (Object)numEntries, (Object)lh.getLedgerMetadata());
        ArrayList<ServerConfiguration> confs = new ArrayList<ServerConfiguration>(5);
        for (i = 0; i < 5; ++i) {
            confs.add(this.killBookie((BookieId)lh.getCurrentEnsemble().get(i)));
        }
        for (i = numEntries; i < 2 * numEntries; ++i) {
            logger.info("Add entry {}", (Object)i);
            lh.addEntry(data);
        }
        logger.info("Ledger metadata after killed bookies : {}", (Object)lh.getLedgerMetadata());
        Assert.assertEquals((String)"There should be ensemble change if breaking ack quorum.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        for (ServerConfiguration conf : confs) {
            this.startAndAddBookie(conf);
        }
        for (int i4 = 2 * numEntries; i4 < 3 * numEntries; ++i4) {
            logger.info("Add entry {}", (Object)i4);
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should not be ensemble changed if delaying ensemble change is enabled.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.verifyEntries(lh, 0L, numEntries, 5L, 0L);
        this.verifyEntriesRange(lh, numEntries, 2 * numEntries, 5L, 0L);
        this.verifyEntries(lh, 2 * numEntries, 3 * numEntries, 5L, 0L);
    }

    @Test
    public void testChangeEnsembleIfBookieReadOnly() throws Exception {
        int i;
        LedgerHandle lh = this.bkc.createLedger(3, 3, 2, this.digestType, this.testPasswd);
        byte[] data = "foobar".getBytes();
        int numEntries = 10;
        for (i = 0; i < numEntries; ++i) {
            lh.addEntry(data);
        }
        this.setBookieToReadOnly((BookieId)lh.getCurrentEnsemble().get(0));
        for (i = numEntries; i < 2 * numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"The ensemble should change when a bookie is readonly even if we delay ensemble change.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
    }

    @Test
    public void testChangeEnsembleSecondBookieReadOnly() throws Exception {
        int i;
        LedgerHandle lh = this.bkc.createLedger(3, 3, 2, this.digestType, this.testPasswd);
        byte[] data = "foobar".getBytes();
        int numEntries = 10;
        for (int i2 = 0; i2 < numEntries; ++i2) {
            lh.addEntry(data);
        }
        BookieId failedBookie = (BookieId)lh.getCurrentEnsemble().get(0);
        BookieId readOnlyBookie = (BookieId)lh.getCurrentEnsemble().get(1);
        ServerConfiguration conf0 = this.killBookie(failedBookie);
        for (i = 0; i < numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"There should be ensemble change if delaying ensemble change is enabled.", (long)1L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        this.setBookieToReadOnly(readOnlyBookie);
        for (i = 0; i < numEntries; ++i) {
            lh.addEntry(data);
        }
        Assert.assertEquals((String)"The ensemble should change when a bookie is readonly even if we delay ensemble change.", (long)2L, (long)lh.getLedgerMetadata().getAllEnsembles().size());
        Assert.assertEquals((long)3L, (long)lh.getCurrentEnsemble().size());
        Assert.assertFalse((boolean)lh.getCurrentEnsemble().contains(failedBookie));
        Assert.assertFalse((boolean)lh.getCurrentEnsemble().contains(readOnlyBookie));
    }

    private static class VerificationCallback
    implements BookkeeperInternalCallbacks.ReadEntryCallback {
        final CountDownLatch latch;
        final AtomicLong numSuccess;
        final AtomicLong numMissing;
        final AtomicLong numFailure;

        VerificationCallback(int numRequests) {
            this.latch = new CountDownLatch(numRequests);
            this.numSuccess = new AtomicLong(0L);
            this.numMissing = new AtomicLong(0L);
            this.numFailure = new AtomicLong(0L);
        }

        public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
            if (rc == 0) {
                this.numSuccess.incrementAndGet();
            } else if (rc == -13 || rc == -7) {
                logger.error("Missed entry({}, {}) from host {}.", new Object[]{ledgerId, entryId, ctx});
                this.numMissing.incrementAndGet();
            } else {
                logger.error("Failed to get entry({}, {}) from host {} : {}", new Object[]{ledgerId, entryId, ctx, rc});
                this.numFailure.incrementAndGet();
            }
            this.latch.countDown();
        }
    }
}

