/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.ClientContext;
import org.apache.bookkeeper.client.ClientInternalConf;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PendingReadOp;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestParallelRead
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestParallelRead.class);
    final BookKeeper.DigestType digestType;
    final byte[] passwd = "parallel-read".getBytes();

    public TestParallelRead() {
        super(6);
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntries) throws Exception {
        LedgerHandle lh = this.bkc.createLedger(ensemble, writeQuorum, ackQuorum, this.digestType, this.passwd);
        for (int i = 0; i < numEntries; ++i) {
            lh.addEntry(("" + i).getBytes());
        }
        lh.close();
        return lh.getId();
    }

    PendingReadOp createReadOp(LedgerHandle lh, long from, long to) {
        return new PendingReadOp(lh, this.bkc.getClientCtx(), from, to, false);
    }

    PendingReadOp createRecoveryReadOp(LedgerHandle lh, long from, long to) {
        return new PendingReadOp(lh, this.bkc.getClientCtx(), from, to, true);
    }

    @Test
    public void testNormalParallelRead() throws Exception {
        LedgerEntry entry;
        int numEntries = 10;
        long id = this.getLedgerToRead(5, 2, 2, numEntries);
        LedgerHandle lh = this.bkc.openLedger(id, this.digestType, this.passwd);
        for (int i = 0; i < numEntries; ++i) {
            PendingReadOp readOp = this.createReadOp(lh, i, i);
            readOp.parallelRead(true).submit();
            Iterator entries = ((LedgerEntries)readOp.future().get()).iterator();
            Assert.assertTrue((boolean)entries.hasNext());
            entry = (LedgerEntry)entries.next();
            Assert.assertNotNull((Object)entry);
            Assert.assertEquals((long)i, (long)Integer.parseInt(new String(entry.getEntryBytes())));
            entry.close();
            Assert.assertFalse((boolean)entries.hasNext());
        }
        PendingReadOp readOp = this.createReadOp(lh, 0L, numEntries - 1);
        readOp.parallelRead(true).submit();
        Iterator iterator = ((LedgerEntries)readOp.future().get()).iterator();
        int numReads = 0;
        while (iterator.hasNext()) {
            entry = (LedgerEntry)iterator.next();
            Assert.assertNotNull((Object)entry);
            Assert.assertEquals((long)numReads, (long)Integer.parseInt(new String(entry.getEntryBytes())));
            entry.close();
            ++numReads;
        }
        Assert.assertEquals((long)numEntries, (long)numReads);
        lh.close();
    }

    private static <T> void expectFail(CompletableFuture<T> future, int expectedRc) {
        try {
            FutureUtils.result(future);
            Assert.fail((String)"Expect to fail");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof BKException));
            BKException bke = (BKException)((Object)e);
            Assert.assertEquals((long)expectedRc, (long)bke.getCode());
        }
    }

    @Test
    public void testParallelReadMissingEntries() throws Exception {
        int numEntries = 10;
        long id = this.getLedgerToRead(5, 2, 2, numEntries);
        LedgerHandle lh = this.bkc.openLedger(id, this.digestType, this.passwd);
        PendingReadOp readOp = this.createReadOp(lh, 11L, 11L);
        readOp.parallelRead(true).submit();
        TestParallelRead.expectFail(readOp.future(), -13);
        readOp = this.createReadOp(lh, 8L, 11L);
        readOp.parallelRead(true).submit();
        TestParallelRead.expectFail(readOp.future(), -13);
        lh.close();
    }

    @Test
    public void testFailParallelRecoveryReadMissingEntryImmediately() throws Exception {
        int numEntries = 1;
        long id = this.getLedgerToRead(5, 5, 3, numEntries);
        ClientConfiguration newConf = new ClientConfiguration().setReadEntryTimeout(30000);
        newConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper newBk = new BookKeeper(newConf);
        LedgerHandle lh = this.bkc.openLedger(id, this.digestType, this.passwd);
        List ensemble = lh.getLedgerMetadata().getEnsembleAt(10L);
        CountDownLatch latch1 = new CountDownLatch(1);
        CountDownLatch latch2 = new CountDownLatch(1);
        this.sleepBookie((BookieId)ensemble.get(0), latch1);
        this.sleepBookie((BookieId)ensemble.get(1), latch2);
        PendingReadOp readOp = this.createRecoveryReadOp(lh, 10L, 10L);
        readOp.parallelRead(true).submit();
        TestParallelRead.expectFail(readOp.future(), -13);
        latch1.countDown();
        latch2.countDown();
        lh.close();
        newBk.close();
    }

    @Test
    public void testParallelReadWithFailedBookies() throws Exception {
        int numEntries = 10;
        long id = this.getLedgerToRead(5, 3, 3, numEntries);
        ClientConfiguration newConf = new ClientConfiguration().setReadEntryTimeout(30000);
        newConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper newBk = new BookKeeper(newConf);
        LedgerHandle lh = this.bkc.openLedger(id, this.digestType, this.passwd);
        List ensemble = lh.getLedgerMetadata().getEnsembleAt(5L);
        this.killBookie((BookieId)ensemble.get(0));
        this.killBookie((BookieId)ensemble.get(1));
        PendingReadOp readOp = this.createReadOp(lh, 0L, numEntries - 1);
        readOp.parallelRead(true).submit();
        Iterator entries = ((LedgerEntries)readOp.future().get()).iterator();
        int numReads = 0;
        while (entries.hasNext()) {
            LedgerEntry entry = (LedgerEntry)entries.next();
            Assert.assertNotNull((Object)entry);
            Assert.assertEquals((long)numReads, (long)Integer.parseInt(new String(entry.getEntryBytes())));
            ++numReads;
        }
        Assert.assertEquals((long)numEntries, (long)numReads);
        lh.close();
        newBk.close();
    }

    @Test
    public void testParallelReadFailureWithFailedBookies() throws Exception {
        int numEntries = 10;
        long id = this.getLedgerToRead(5, 3, 3, numEntries);
        ClientConfiguration newConf = new ClientConfiguration().setReadEntryTimeout(30000);
        newConf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper newBk = new BookKeeper(newConf);
        LedgerHandle lh = this.bkc.openLedger(id, this.digestType, this.passwd);
        List ensemble = lh.getLedgerMetadata().getEnsembleAt(5L);
        this.killBookie((BookieId)ensemble.get(0));
        this.killBookie((BookieId)ensemble.get(1));
        this.killBookie((BookieId)ensemble.get(2));
        PendingReadOp readOp = this.createReadOp(lh, 0L, numEntries - 1);
        readOp.parallelRead(true).submit();
        TestParallelRead.expectFail(readOp.future(), -8);
        lh.close();
        newBk.close();
    }

    @Test
    public void testLedgerEntryRequestComplete() throws Exception {
        LedgerHandle lh = (LedgerHandle)Mockito.mock(LedgerHandle.class);
        LedgerMetadata ledgerMetadata = (LedgerMetadata)Mockito.mock(LedgerMetadata.class);
        ClientContext clientContext = (ClientContext)Mockito.mock(ClientContext.class);
        ClientInternalConf clientInternalConf = (ClientInternalConf)Mockito.mock(ClientInternalConf.class);
        ((ClientContext)Mockito.doReturn((Object)clientInternalConf).when((Object)clientContext)).getConf();
        BookKeeperClientStats bookKeeperClientStats = (BookKeeperClientStats)Mockito.mock(BookKeeperClientStats.class);
        ((ClientContext)Mockito.doReturn((Object)bookKeeperClientStats).when((Object)clientContext)).getClientStats();
        OpStatsLogger opStatsLogger = (OpStatsLogger)Mockito.mock(OpStatsLogger.class);
        ((BookKeeperClientStats)Mockito.doReturn((Object)opStatsLogger).when((Object)bookKeeperClientStats)).getReadOpLogger();
        ((LedgerHandle)Mockito.doReturn((Object)ledgerMetadata).when((Object)lh)).getLedgerMetadata();
        ((LedgerMetadata)Mockito.doReturn((Object)2).when((Object)ledgerMetadata)).getWriteQuorumSize();
        ((LedgerMetadata)Mockito.doReturn((Object)1).when((Object)ledgerMetadata)).getAckQuorumSize();
        ((LedgerMetadata)Mockito.doReturn(new TreeMap()).when((Object)ledgerMetadata)).getAllEnsembles();
        DistributionSchedule.WriteSet writeSet = (DistributionSchedule.WriteSet)Mockito.mock(DistributionSchedule.WriteSet.class);
        ((LedgerHandle)Mockito.doReturn((Object)writeSet).when((Object)lh)).getWriteSetForReadOperation(ArgumentMatchers.anyLong());
        PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1L, 2L, false);
        pendingReadOp.parallelRead(true);
        pendingReadOp.initiate();
        PendingReadOp.LedgerEntryRequest first = (PendingReadOp.LedgerEntryRequest)pendingReadOp.seq.get(0);
        PendingReadOp.LedgerEntryRequest second = (PendingReadOp.LedgerEntryRequest)pendingReadOp.seq.get(1);
        pendingReadOp.submitCallback(-105);
        Assert.assertEquals((long)-1L, (long)first.entryImpl.getEntryId());
        Assert.assertEquals((long)-1L, (long)first.entryImpl.getLedgerId());
        Assert.assertEquals((long)-1L, (long)first.entryImpl.getLength());
        Assert.assertNull((Object)first.entryImpl.getEntryBuffer());
        Assert.assertTrue((boolean)first.complete.get());
        Assert.assertEquals((long)-1L, (long)second.entryImpl.getEntryId());
        Assert.assertEquals((long)-1L, (long)second.entryImpl.getLedgerId());
        Assert.assertEquals((long)-1L, (long)second.entryImpl.getLength());
        Assert.assertNull((Object)second.entryImpl.getEntryBuffer());
        Assert.assertTrue((boolean)second.complete.get());
        Method method = PendingReadOp.class.getDeclaredMethod("createReadContext", Integer.TYPE, BookieId.class, PendingReadOp.LedgerEntryRequest.class);
        method.setAccessible(true);
        ByteBuf byteBuf = Unpooled.buffer((int)10);
        pendingReadOp.readEntryComplete(0, 1L, 1L, Unpooled.buffer((int)10), method.invoke((Object)pendingReadOp, 1, BookieId.parse((String)"test"), first));
        Assert.assertEquals((long)byteBuf.refCnt(), (long)1L);
        Assert.assertNull((Object)first.entryImpl.getEntryBuffer());
        Assert.assertTrue((boolean)first.complete.get());
        pendingReadOp = new PendingReadOp(lh, clientContext, 1L, 2L, false);
        pendingReadOp.parallelRead(true);
        pendingReadOp.initiate();
        pendingReadOp.readEntryComplete(-105, 1L, 1L, Unpooled.buffer((int)10), method.invoke((Object)pendingReadOp, 1, BookieId.parse((String)"test"), first));
        pendingReadOp.readEntryComplete(-105, 1L, 1L, Unpooled.buffer((int)10), method.invoke((Object)pendingReadOp, 1, BookieId.parse((String)"test"), first));
        byteBuf = Unpooled.buffer((int)10);
        pendingReadOp.readEntryComplete(0, 1L, 1L, Unpooled.buffer((int)10), method.invoke((Object)pendingReadOp, 1, BookieId.parse((String)"test"), first));
        Assert.assertEquals((long)1L, (long)byteBuf.refCnt());
    }
}

