/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManager;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class LedgerManagerIteratorTest
extends BaseMetadataStoreTest {
    private static final Logger log = LoggerFactory.getLogger(LedgerManagerIteratorTest.class);

    private String newLedgersRoot() {
        return "/ledgers-" + UUID.randomUUID();
    }

    void removeLedger(LedgerManager lm, Long ledgerId) throws Exception {
        lm.removeLedgerMetadata(ledgerId.longValue(), Version.ANY).get();
    }

    void createLedger(LedgerManager lm, Long ledgerId) throws Exception {
        this.createLedgerAsync(lm, ledgerId).get();
    }

    CompletableFuture<Versioned<LedgerMetadata>> createLedgerAsync(LedgerManager lm, long ledgerId) {
        ArrayList ensemble = Lists.newArrayList((Object[])new BookieId[]{new BookieSocketAddress("192.0.2.1", 1234).toBookieId(), new BookieSocketAddress("192.0.2.2", 1234).toBookieId(), new BookieSocketAddress("192.0.2.3", 1234).toBookieId()});
        LedgerMetadata meta = LedgerMetadataBuilder.create().withId(ledgerId).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).withPassword("passwd".getBytes()).withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType()).newEnsembleEntry(0L, (List)ensemble).build();
        return lm.createLedgerMetadata(ledgerId, meta);
    }

    static Set<Long> ledgerRangeToSet(LedgerManager.LedgerRangeIterator lri) throws IOException {
        TreeSet<Long> ret = new TreeSet<Long>();
        long last = -1L;
        while (lri.hasNext()) {
            LedgerManager.LedgerRange lr = lri.next();
            Assert.assertFalse((String)"ledger range must not be empty", (boolean)lr.getLedgers().isEmpty());
            Assert.assertTrue((String)"ledger ranges must not overlap", (last < lr.start() ? 1 : 0) != 0);
            ret.addAll(lr.getLedgers());
            last = lr.end();
        }
        return ret;
    }

    static Set<Long> getLedgerIdsByUsingAsyncProcessLedgers(LedgerManager lm) throws InterruptedException {
        ConcurrentHashMap.KeySetView ledgersReadAsync = ConcurrentHashMap.newKeySet();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicInteger finalRC = new AtomicInteger();
        lm.asyncProcessLedgers((ledgerId, callback) -> {
            ledgersReadAsync.add(ledgerId);
            callback.processResult(0, null, null);
        }, (rc, s, obj) -> {
            finalRC.set(rc);
            latch.countDown();
        }, null, 0, -1);
        latch.await();
        Assert.assertEquals((String)"Final RC of asyncProcessLedgers", (long)0L, (long)finalRC.get());
        return ledgersReadAsync;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testIterateNoLedgers(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, this.newLedgersRoot());
            try {
                LedgerManager.LedgerRangeIterator lri = lm.getLedgerRanges(0L);
                Assert.assertNotNull((Object)lri);
                if (lri.hasNext()) {
                    lri.next();
                }
                Assert.assertEquals((Object)false, (Object)lri.hasNext());
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testSingleLedger(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, this.newLedgersRoot());
            try {
                long id = 2020202L;
                this.createLedger((LedgerManager)lm, id);
                LedgerManager.LedgerRangeIterator lri = lm.getLedgerRanges(0L);
                Assert.assertNotNull((Object)lri);
                Set<Long> lids = LedgerManagerIteratorTest.ledgerRangeToSet(lri);
                Assert.assertEquals((long)lids.size(), (long)1L);
                Assert.assertEquals((long)lids.iterator().next(), (long)id);
                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)lm);
                Assert.assertEquals((String)"Comparing LedgersIds read asynchronously", lids, ledgersReadAsync);
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testTwoLedgers(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, this.newLedgersRoot());
            try {
                TreeSet<Long> ids = new TreeSet<Long>(Arrays.asList(101010101L, 2020340302L));
                for (Long id : ids) {
                    this.createLedger((LedgerManager)lm, id);
                }
                LedgerManager.LedgerRangeIterator lri = lm.getLedgerRanges(0L);
                Assert.assertNotNull((Object)lri);
                Set<Long> returnedIds = LedgerManagerIteratorTest.ledgerRangeToSet(lri);
                Assert.assertEquals(ids, returnedIds);
                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)lm);
                Assert.assertEquals((String)"Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testSeveralContiguousLedgers(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, this.newLedgersRoot());
            try {
                TreeSet<Long> ids = new TreeSet<Long>();
                ArrayList<CompletableFuture<Versioned<LedgerMetadata>>> futures = new ArrayList<CompletableFuture<Versioned<LedgerMetadata>>>();
                for (long i = 0L; i < 2000L; ++i) {
                    futures.add(this.createLedgerAsync((LedgerManager)lm, i));
                    ids.add(i);
                }
                FutureUtil.waitForAll(futures).get();
                LedgerManager.LedgerRangeIterator lri = lm.getLedgerRanges(0L);
                Assert.assertNotNull((Object)lri);
                Set<Long> returnedIds = LedgerManagerIteratorTest.ledgerRangeToSet(lri);
                Assert.assertEquals(ids, returnedIds);
                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)lm);
                Assert.assertEquals((String)"Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testRemovalOfNodeJustTraversed(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, this.newLedgersRoot());
            try {
                TreeSet<Long> toRemove = new TreeSet<Long>(Arrays.asList(3394498498348983841L, 3394498498348983842L, 3394498498348993841L));
                long first = 2345678901234567890L;
                TreeSet<Long> mustHave = new TreeSet<Long>(Arrays.asList(first, 6334994393848474732L));
                TreeSet<Long> ids = new TreeSet<Long>();
                ids.addAll(toRemove);
                ids.addAll(mustHave);
                for (Long id : ids) {
                    this.createLedger((LedgerManager)lm, id);
                }
                TreeSet found = new TreeSet();
                LedgerManager.LedgerRangeIterator lri = lm.getLedgerRanges(0L);
                while (lri.hasNext()) {
                    LedgerManager.LedgerRange lr = lri.next();
                    found.addAll(lr.getLedgers());
                    if (!lr.getLedgers().contains(first)) continue;
                    Iterator iterator = toRemove.iterator();
                    while (iterator.hasNext()) {
                        long id = (Long)iterator.next();
                        this.removeLedger((LedgerManager)lm, id);
                    }
                    toRemove.clear();
                }
                Iterator iterator = mustHave.iterator();
                while (iterator.hasNext()) {
                    long id = (Long)iterator.next();
                    Assert.assertTrue((boolean)found.contains(id));
                }
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void validateEmptyL4PathSkipped(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            String ledgersRoot = this.newLedgersRoot();
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, ledgersRoot);
            try {
                String[] paths;
                TreeSet<Long> ids = new TreeSet<Long>(Arrays.asList(2345678901234567890L, 3394498498348983841L, 6334994393848474732L, 7349370101927398483L));
                for (Long l : ids) {
                    this.createLedger((LedgerManager)lm, l);
                }
                for (String path : paths = new String[]{ledgersRoot + "/633/4994/3938/4948"}) {
                    store.put(path, "data".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
                }
                LedgerManager.LedgerRangeIterator ledgerRangeIterator = lm.getLedgerRanges(0L);
                Assert.assertNotNull((Object)ledgerRangeIterator);
                Set<Long> returnedIds = LedgerManagerIteratorTest.ledgerRangeToSet(ledgerRangeIterator);
                Assert.assertEquals(ids, returnedIds);
                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)lm);
                Assert.assertEquals((String)"Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
                LedgerManager.LedgerRangeIterator ledgerRangeIterator2 = lm.getLedgerRanges(0L);
                int emptyRanges = 0;
                while (ledgerRangeIterator2.hasNext()) {
                    if (!ledgerRangeIterator2.next().getLedgers().isEmpty()) continue;
                    ++emptyRanges;
                }
                Assert.assertEquals((long)0L, (long)emptyRanges);
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testWithSeveralIncompletePaths(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            String ledgersRoot = this.newLedgersRoot();
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, ledgersRoot);
            try {
                String[] paths;
                TreeSet<Long> ids = new TreeSet<Long>(Arrays.asList(2345678901234567890L, 3394498498348983841L, 6334994393848474732L, 7349370101927398483L));
                for (Long l : ids) {
                    this.createLedger((LedgerManager)lm, l);
                }
                for (String path : paths = new String[]{ledgersRoot + "000/0000/0000", ledgersRoot + "/234/5678/9999", ledgersRoot + "/339/0000/0000", ledgersRoot + "/633/4994/3938/0000", ledgersRoot + "/922/3372/0000/0000"}) {
                    store.put(path, "data".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
                }
                LedgerManager.LedgerRangeIterator ledgerRangeIterator = lm.getLedgerRanges(0L);
                Assert.assertNotNull((Object)ledgerRangeIterator);
                Set<Long> returnedIds = LedgerManagerIteratorTest.ledgerRangeToSet(ledgerRangeIterator);
                Assert.assertEquals(ids, returnedIds);
                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)lm);
                Assert.assertEquals((String)"Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void checkConcurrentModifications(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            String ledgersRoot = this.newLedgersRoot();
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, ledgersRoot);
            try {
                Future f2;
                int i;
                int numWriters = 10;
                int numCheckers = 10;
                int numLedgers = 100;
                long runtime = TimeUnit.NANOSECONDS.convert(2L, TimeUnit.SECONDS);
                boolean longRange = true;
                TreeSet<Long> mustExist = new TreeSet<Long>();
                Random rng = new Random();
                for (int i2 = 0; i2 < 100; ++i2) {
                    long lid = Math.abs(rng.nextLong());
                    this.createLedger((LedgerManager)lm, lid);
                    mustExist.add(lid);
                }
                long start = MathUtils.nowInNano();
                CountDownLatch latch = new CountDownLatch(1);
                ArrayList<Future> futures = new ArrayList<Future>();
                ExecutorService executor = Executors.newCachedThreadPool();
                ConcurrentSkipListSet createdLedgers = new ConcurrentSkipListSet();
                for (i = 0; i < 10; ++i) {
                    f2 = executor.submit(() -> {
                        PulsarLedgerManager writerLM = new PulsarLedgerManager((MetadataStore)store, ledgersRoot);
                        try {
                            Random writerRNG = new Random(rng.nextLong());
                            latch.await();
                            while (MathUtils.elapsedNanos((long)start) < runtime) {
                                long candidate = 0L;
                                while (mustExist.contains(candidate = Math.abs(writerRNG.nextLong())) || !createdLedgers.add(candidate)) {
                                }
                                this.createLedger((LedgerManager)writerLM, candidate);
                                this.removeLedger((LedgerManager)writerLM, candidate);
                            }
                            Object var13_12 = null;
                            return var13_12;
                        }
                        finally {
                            if (Collections.singletonList(writerLM).get(0) != null) {
                                writerLM.close();
                            }
                        }
                    });
                    futures.add(f2);
                }
                for (i = 0; i < 10; ++i) {
                    f2 = executor.submit(() -> {
                        PulsarLedgerManager checkerLM = new PulsarLedgerManager((MetadataStore)store, ledgersRoot);
                        try {
                            latch.await();
                            while (MathUtils.elapsedNanos((long)start) < runtime) {
                                LedgerManager.LedgerRangeIterator lri = checkerLM.getLedgerRanges(0L);
                                Set<Long> returnedIds = LedgerManagerIteratorTest.ledgerRangeToSet(lri);
                                Iterator iterator = mustExist.iterator();
                                while (iterator.hasNext()) {
                                    long id = (Long)iterator.next();
                                    Assert.assertTrue((boolean)returnedIds.contains(id));
                                }
                                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)checkerLM);
                                Iterator iterator2 = mustExist.iterator();
                                while (iterator2.hasNext()) {
                                    long id = (Long)iterator2.next();
                                    Assert.assertTrue((boolean)ledgersReadAsync.contains(id));
                                }
                            }
                            Object var9_7 = null;
                            return var9_7;
                        }
                        finally {
                            if (Collections.singletonList(checkerLM).get(0) != null) {
                                checkerLM.close();
                            }
                        }
                    });
                    futures.add(f2);
                }
                latch.countDown();
                for (Future f2 : futures) {
                    f2.get();
                }
                executor.shutdownNow();
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void hierarchicalLedgerManagerAsyncProcessLedgersTest(String provider, Supplier<String> urlSupplier) throws Throwable {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerManager lm = new PulsarLedgerManager((MetadataStore)store, this.newLedgersRoot());
            try {
                LedgerManager.LedgerRangeIterator lri = lm.getLedgerRanges(0L);
                TreeSet<Long> ledgerIds = new TreeSet<Long>(Arrays.asList(1234L, 123456789123456789L));
                for (Long ledgerId : ledgerIds) {
                    this.createLedger((LedgerManager)lm, ledgerId);
                }
                Set<Long> ledgersReadThroughIterator = LedgerManagerIteratorTest.ledgerRangeToSet(lri);
                Assert.assertEquals((String)"Comparing LedgersIds read through Iterator", ledgerIds, ledgersReadThroughIterator);
                Set<Long> ledgersReadAsync = LedgerManagerIteratorTest.getLedgerIdsByUsingAsyncProcessLedgers((LedgerManager)lm);
                Assert.assertEquals((String)"Comparing LedgersIds read asynchronously", ledgerIds, ledgersReadAsync);
            }
            finally {
                if (Collections.singletonList(lm).get(0) != null) {
                    lm.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }
}

