/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ManagedCursorConcurrencyTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(ManagedCursorConcurrencyTest.class);
    private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback(){

        public void deleteComplete(Object ctx) {
            log.info("Deleted message at {}", ctx);
        }

        public void deleteFailed(ManagedLedgerException exception, Object ctx) {
            log.error("Failed to delete message at {}", ctx, (Object)exception);
        }
    };

    @DataProvider(name="useOpenRangeSet")
    public static Object[][] useOpenRangeSet() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @Test(dataProvider="useOpenRangeSet")
    public void testMarkDeleteAndRead(boolean useOpenRangeSet) throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(2).setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        final ManagedCursor cursor = ledger.openCursor("c1");
        final ArrayList addedEntries = Lists.newArrayList();
        for (int i = 0; i < 1000; ++i) {
            Position pos = ledger.addEntry("entry".getBytes());
            addedEntries.add(pos);
        }
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        Thread deleter = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (Position position : addedEntries) {
                        cursor.markDelete(position);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread reader = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (int i = 0; i < 1000; ++i) {
                        cursor.readEntries(1).forEach(e -> e.release());
                    }
                }
                catch (Exception e2) {
                    e2.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        deleter.start();
        reader.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
    }

    @Test
    public void testCloseAndRead() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger_test_close_and_read", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        final ManagedCursor cursor = ledger.openCursor("c1");
        final CompletableFuture closeFuture = new CompletableFuture();
        String CLOSED = "closed";
        final ArrayList addedEntries = Lists.newArrayList();
        for (int i = 0; i < 1000; ++i) {
            Position pos = ledger.addEntry("entry".getBytes());
            addedEntries.add(pos);
        }
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        Thread deleter = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (Position position : addedEntries) {
                        cursor.markDelete(position);
                        Thread.sleep(1L);
                    }
                }
                catch (ManagedLedgerException e) {
                    if (!e.getMessage().equals("Cursor was already closed")) {
                        gotException.set(true);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread reader = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    for (int i = 0; i < 1000; ++i) {
                        cursor.readEntries(1).forEach(e -> e.release());
                        Thread.sleep(2L, 195);
                    }
                    cursor.asyncClose(new AsyncCallbacks.CloseCallback(){

                        public void closeComplete(Object ctx) {
                            log.info("Successfully closed cursor ledger");
                            closeFuture.complete("closed");
                        }

                        public void closeFailed(ManagedLedgerException exception, Object ctx) {
                            log.error("Error closing cursor: ", (Throwable)exception);
                            closeFuture.completeExceptionally(new Exception(exception));
                        }
                    }, null);
                }
                catch (Exception e2) {
                    e2.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        deleter.start();
        reader.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
        Assert.assertEquals((String)((String)closeFuture.get()), (String)"closed");
    }

    @Test(timeOut=30000L)
    public void testAckAndClose() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger_test_ack_and_close", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("c1");
        ArrayList addedEntries = Lists.newArrayList();
        for (int i = 0; i < 1000; ++i) {
            Position pos = ledger.addEntry("entry".getBytes());
            addedEntries.add(pos);
        }
        CyclicBarrier barrier = new CyclicBarrier(2);
        CountDownLatch counter = new CountDownLatch(2);
        AtomicBoolean gotException = new AtomicBoolean(false);
        this.cachedExecutor.execute(() -> {
            try {
                barrier.await();
                for (Position position : addedEntries) {
                    cursor.asyncDelete(position, this.deleteCallback, (Object)position);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                gotException.set(true);
            }
            finally {
                counter.countDown();
            }
        });
        this.cachedExecutor.execute(() -> {
            try {
                barrier.await();
                for (int i = 0; i < 1000; ++i) {
                    cursor.readEntries(1).forEach(e -> e.release());
                }
            }
            catch (Exception e2) {
                e2.printStackTrace();
                gotException.set(true);
            }
            finally {
                counter.countDown();
            }
        });
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
    }

    @Test(timeOut=30000L)
    public void testConcurrentIndividualDeletes() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(100));
        ManagedCursor cursor = ledger.openCursor("c1");
        int N = 1000;
        ArrayList addedEntries = Lists.newArrayListWithExpectedSize((int)1000);
        for (int i = 0; i < 1000; ++i) {
            Position pos = ledger.addEntry("entry".getBytes());
            addedEntries.add(pos);
        }
        int Threads = 10;
        CyclicBarrier barrier = new CyclicBarrier(10);
        CountDownLatch counter = new CountDownLatch(10);
        AtomicBoolean gotException = new AtomicBoolean(false);
        int thread = 0;
        while (thread < 10) {
            int myThread = thread++;
            this.cachedExecutor.execute(() -> {
                try {
                    barrier.await();
                    for (int i = 0; i < 1000; ++i) {
                        int threadId = i % 10;
                        if (threadId != myThread) continue;
                        cursor.delete((Position)addedEntries.get(i));
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            });
        }
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), addedEntries.get(addedEntries.size() - 1));
    }

    @Test(timeOut=30000L)
    public void testConcurrentReadOfSameEntry() throws Exception {
        ManagedLedger ledger = this.factory.open("testConcurrentReadOfSameEntry", new ManagedLedgerConfig());
        int numCursors = 5;
        ArrayList cursors = Lists.newArrayList();
        for (int i = 0; i < 5; ++i) {
            ManagedCursor cursor = ledger.openCursor("c" + i);
            cursors.add(cursor);
        }
        int N = 100;
        for (int i = 0; i < 100; ++i) {
            ledger.addEntry(("entry" + i).getBytes());
        }
        long currentLedger = ((PositionImpl)((ManagedCursor)cursors.get(0)).getMarkDeletedPosition()).getLedgerId();
        ((ManagedLedgerImpl)ledger).entryCache.invalidateAllEntries(currentLedger);
        CyclicBarrier barrier = new CyclicBarrier(5);
        CountDownLatch counter = new CountDownLatch(5);
        AtomicReference result = new AtomicReference();
        int i = 0;
        while (i < 5) {
            int cursorIndex = i++;
            ManagedCursor cursor = (ManagedCursor)cursors.get(cursorIndex);
            this.cachedExecutor.execute(() -> {
                try {
                    barrier.await();
                    for (int j = 0; j < 100; ++j) {
                        String data;
                        String expected = "entry" + j;
                        if (expected.equals(data = new String(((Entry)cursor.readEntries(1).get(0)).getDataAndRelease())) || result.get() != null) continue;
                        result.set("Mismatched entry in cursor " + (cursorIndex + 1) + " at position " + (j + 1) + "--- Expected: " + expected + ", Actual: " + data);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    counter.countDown();
                }
            });
        }
        counter.await();
        Assert.assertNull(result.get());
    }

    @Test(timeOut=30000L)
    public void testConcurrentIndividualDeletesWithGetNthEntry() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(100).setThrottleMarkDelete(0.5));
        ManagedCursor cursor = ledger.openCursor("c1");
        int N = 1000;
        ArrayList addedEntries = Lists.newArrayListWithExpectedSize((int)1000);
        for (int i = 0; i < 1000; ++i) {
            Position pos = ledger.addEntry("entry".getBytes());
            addedEntries.add(pos);
        }
        int deleteEntries = 100;
        CountDownLatch counter = new CountDownLatch(100);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        AtomicInteger iteration = new AtomicInteger(0);
        for (int i = 0; i < 100; ++i) {
            this.executor.submit((Runnable)SafeRun.safeRun(() -> {
                try {
                    cursor.asyncDelete((Position)addedEntries.get(iteration.getAndIncrement()), new AsyncCallbacks.DeleteCallback(){

                        public void deleteComplete(Object ctx) {
                        }

                        public void deleteFailed(ManagedLedgerException exception, Object ctx) {
                            exception.printStackTrace();
                            gotException.set(true);
                        }
                    }, null);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }));
        }
        counter.await();
        int readEntries = 900;
        final CountDownLatch readCounter = new CountDownLatch(900);
        final AtomicInteger successReadEntries = new AtomicInteger(0);
        for (int i = 1; i <= 900; ++i) {
            try {
                cursor.asyncGetNthEntry(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback(){

                    public void readEntryComplete(Entry entry, Object ctx) {
                        successReadEntries.getAndIncrement();
                        entry.release();
                        readCounter.countDown();
                    }

                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                        exception.printStackTrace();
                        gotException.set(true);
                    }
                }, null);
                continue;
            }
            catch (Exception e) {
                e.printStackTrace();
                gotException.set(true);
            }
        }
        readCounter.await();
        Assert.assertFalse((boolean)gotException.get());
        Assert.assertEquals((int)successReadEntries.get(), (int)900);
    }
}

