package org.apache.bookkeeper.test;

import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.shaded.com.google.common.base.Charsets;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/test/MultipleThreadReadTest.class */
public class MultipleThreadReadTest extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(MultipleThreadReadTest.class);
    BookKeeper.DigestType digestType;
    byte[] ledgerPassword;
    private int entriesPerLedger;
    final SyncObj mainSyncObj;
    BookKeeperTestClient readBkc;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/test/MultipleThreadReadTest$SyncObj.class */
    public class SyncObj {
        volatile int counter = 0;
        boolean failed = false;

        public SyncObj() {
        }
    }

    public MultipleThreadReadTest() {
        super(6);
        this.ledgerPassword = "aaa".getBytes();
        this.entriesPerLedger = 100;
        this.mainSyncObj = new SyncObj();
        this.digestType = BookKeeper.DigestType.CRC32;
        this.baseClientConf.setAddEntryTimeout(20);
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void setUp() throws Exception {
        super.setUp();
        this.readBkc = new BookKeeperTestClient(this.baseClientConf);
    }

    private Thread getWriterThread(final int i, final LedgerHandle ledgerHandle, final AtomicBoolean atomicBoolean) {
        Thread thread = new Thread(new Runnable() { // from class: org.apache.bookkeeper.test.MultipleThreadReadTest.1
            @Override // java.lang.Runnable
            public void run() {
                SyncObj syncObj = new SyncObj();
                for (int i2 = 0; i2 < MultipleThreadReadTest.this.entriesPerLedger; i2++) {
                    final byte[] bytes = ("Entry-" + i + "-" + i2).getBytes();
                    ledgerHandle.asyncAddEntry(bytes, new AsyncCallback.AddCallback() { // from class: org.apache.bookkeeper.test.MultipleThreadReadTest.1.1
                        public void addComplete(int i3, LedgerHandle ledgerHandle2, long j, Object obj) {
                            SyncObj syncObj2 = (SyncObj) obj;
                            synchronized (syncObj2) {
                                if (i3 != 0) {
                                    MultipleThreadReadTest.LOG.error("Add entry {} failed : rc = {}", new String(bytes, Charsets.UTF_8), Integer.valueOf(i3));
                                    syncObj2.failed = true;
                                    syncObj2.notify();
                                } else {
                                    syncObj2.counter++;
                                    syncObj2.notify();
                                }
                            }
                        }
                    }, syncObj);
                }
                synchronized (syncObj) {
                    while (!syncObj.failed && syncObj.counter < MultipleThreadReadTest.this.entriesPerLedger) {
                        try {
                            syncObj.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    atomicBoolean.set(!syncObj.failed);
                }
                try {
                    ledgerHandle.close();
                } catch (BKException e2) {
                    MultipleThreadReadTest.LOG.error("Error on closing ledger handle {} : ", Long.valueOf(ledgerHandle.getId()), e2);
                } catch (InterruptedException e3) {
                    MultipleThreadReadTest.LOG.error("Interrupted on closing ledger handle {} : ", Long.valueOf(ledgerHandle.getId()), e3);
                    Thread.currentThread().interrupt();
                }
            }
        }, "WriteThread(Lid=" + ledgerHandle.getId() + ")");
        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.bookkeeper.test.MultipleThreadReadTest.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread2, Throwable th) {
                synchronized (MultipleThreadReadTest.this.mainSyncObj) {
                    MultipleThreadReadTest.this.mainSyncObj.failed = true;
                }
            }
        });
        return thread;
    }

    private Thread getReaderThread(int i, final LedgerHandle ledgerHandle, final int i2, final AtomicBoolean atomicBoolean) {
        Thread thread = new Thread(new Runnable() { // from class: org.apache.bookkeeper.test.MultipleThreadReadTest.3
            @Override // java.lang.Runnable
            public void run() {
                long j = 0;
                long j2 = 0;
                while (j <= MultipleThreadReadTest.this.entriesPerLedger - 1) {
                    long min = Math.min((j + 10) - 1, MultipleThreadReadTest.this.entriesPerLedger - 1);
                    long j3 = (min - j) + 1;
                    boolean z = true;
                    try {
                        long j4 = min;
                        Enumeration readEntries = ledgerHandle.readEntries(j, j4);
                        int i3 = 0;
                        while (true) {
                            if (i3 >= j3) {
                                break;
                            }
                            try {
                                LedgerEntry ledgerEntry = (LedgerEntry) readEntries.nextElement();
                                long j5 = j2;
                                long j6 = j4;
                                j4 = 1;
                                j2 = j6 + 1;
                                if (ledgerEntry.getEntryId() != j5) {
                                    MultipleThreadReadTest.LOG.error("Expected entry id {} for ledger {} but {} found.", new Object[]{Long.valueOf(j5), Long.valueOf(ledgerHandle.getId()), Long.valueOf(ledgerEntry.getEntryId())});
                                    z = false;
                                    break;
                                }
                                byte[] entry = ledgerEntry.getEntry();
                                if (!Arrays.equals(("Entry-" + i2 + "-" + ledgerEntry.getEntryId()).getBytes(), entry)) {
                                    MultipleThreadReadTest.LOG.error("Expected entry data 'Entry-{}-{}' but {} found for ledger {}.", new Object[]{Integer.valueOf(i2), Long.valueOf(ledgerEntry.getEntryId()), new String(entry, Charsets.UTF_8), Long.valueOf(ledgerHandle.getId())});
                                    z = false;
                                    break;
                                }
                                i3++;
                            } catch (NoSuchElementException e) {
                                z = false;
                            }
                        }
                        if (z) {
                            z = !readEntries.hasMoreElements();
                            if (!z) {
                                MultipleThreadReadTest.LOG.error("Found more entries returned on reading ({}-{}) from ledger {}.", new Object[]{Long.valueOf(j), Long.valueOf(min), Long.valueOf(ledgerHandle.getId())});
                            }
                        }
                    } catch (BKException e2) {
                        MultipleThreadReadTest.LOG.error("Failed on reading entries ({} - {}) from ledger {} : ", new Object[]{Long.valueOf(j), Long.valueOf(min), Long.valueOf(ledgerHandle.getId()), e2});
                        z = false;
                    } catch (InterruptedException e3) {
                        MultipleThreadReadTest.LOG.error("Interrupted on reading entries ({} - {}) from ledger {} : ", new Object[]{Long.valueOf(j), Long.valueOf(min), Long.valueOf(ledgerHandle.getId()), e3});
                        Thread.currentThread().interrupt();
                        z = false;
                    }
                    atomicBoolean.set(z);
                    if (!z) {
                        return;
                    } else {
                        j = min + 1;
                    }
                }
            }
        }, "ReadThread(Tid =" + i + ", Lid=" + ledgerHandle.getId() + ")");
        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.bookkeeper.test.MultipleThreadReadTest.4
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread2, Throwable th) {
                MultipleThreadReadTest.LOG.error("Uncaught exception in thread {} : ", thread2.getName(), th);
                synchronized (MultipleThreadReadTest.this.mainSyncObj) {
                    MultipleThreadReadTest.this.mainSyncObj.failed = true;
                }
            }
        });
        return thread;
    }

    public void multiLedgerMultiThreadRead(int i, int i2) throws IOException {
        Assert.assertTrue(i != 0 && i2 >= i && i2 % i == 0);
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            for (int i3 = 0; i3 < i; i3++) {
                LedgerHandle createLedger = this.bkc.createLedger(this.digestType, this.ledgerPassword);
                arrayList.add(createLedger);
                arrayList2.add(Long.valueOf(createLedger.getId()));
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                arrayList4.add(atomicBoolean);
                Thread writerThread = getWriterThread(i3, (LedgerHandle) arrayList.get(i3), atomicBoolean);
                arrayList3.add(writerThread);
                writerThread.start();
            }
            Iterator it = arrayList3.iterator();
            while (it.hasNext()) {
                ((Thread) it.next()).join();
            }
            synchronized (this.mainSyncObj) {
                if (this.mainSyncObj.failed) {
                    Assert.fail("Test failed because we encountered uncaught exception on adding entries.");
                }
            }
            for (int i4 = 0; i4 < i; i4++) {
                Assert.assertTrue("Failed on adding entries for ledger " + ((LedgerHandle) arrayList.get(i4)).getId(), ((AtomicBoolean) arrayList4.get(i4)).get());
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                try {
                    ((LedgerHandle) it2.next()).close();
                } catch (Exception e) {
                    Assert.fail("Error while closing handle.");
                } catch (BKException.BKLedgerClosedException e2) {
                }
            }
            this.mainSyncObj.failed = false;
            arrayList3.clear();
            ArrayList arrayList5 = new ArrayList();
            for (int i5 = 0; i5 < i2; i5++) {
                AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                Thread readerThread = getReaderThread(i5, this.readBkc.openLedger(((Long) arrayList2.get(i5 % i)).longValue(), this.digestType, this.ledgerPassword), i5 % i, atomicBoolean2);
                arrayList3.add(readerThread);
                arrayList5.add(atomicBoolean2);
                readerThread.start();
            }
            Iterator it3 = arrayList3.iterator();
            while (it3.hasNext()) {
                ((Thread) it3.next()).join();
            }
            synchronized (this.mainSyncObj) {
                if (this.mainSyncObj.failed) {
                    Assert.fail("Test failed because we encountered uncaught exception on reading entries");
                }
            }
            Iterator it4 = arrayList5.iterator();
            while (it4.hasNext()) {
                Assert.assertTrue("Failed on read entries", ((AtomicBoolean) it4.next()).get());
            }
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            LOG.error("Test failed", e3);
            Assert.fail("Test failed due to interruption");
        } catch (BKException e4) {
            LOG.error("Test failed", e4);
            Assert.fail("Test failed due to BookKeeper exception");
        }
    }

    @Test
    public void test10Ledgers20ThreadsRead() throws IOException {
        multiLedgerMultiThreadRead(10, 20);
    }

    @Test
    public void test10Ledgers200ThreadsRead() throws IOException {
        multiLedgerMultiThreadRead(10, 200);
    }

    @Test
    public void test1Ledger20ThreadsRead() throws IOException {
        multiLedgerMultiThreadRead(1, 20);
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void tearDown() throws Exception {
        this.readBkc.close();
        super.tearDown();
    }
}
