/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerIdGenerator;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PulsarLedgerIdGeneratorTest
extends BaseMetadataStoreTest {
    private static final Logger log = LoggerFactory.getLogger(PulsarLedgerIdGeneratorTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testGenerateLedgerId(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerIdGenerator ledgerIdGenerator = new PulsarLedgerIdGenerator(store, "/ledgers");
            try {
                int nThread = 2;
                int nLedgers = 2000;
                CountDownLatch countDownLatch1 = new CountDownLatch(4000);
                AtomicInteger errCount = new AtomicInteger(0);
                ConcurrentLinkedQueue shortLedgerIds = new ConcurrentLinkedQueue();
                ConcurrentLinkedQueue longLedgerIds = new ConcurrentLinkedQueue();
                long start = System.currentTimeMillis();
                ExecutorService executor = Executors.newCachedThreadPool();
                try {
                    Long ledger;
                    for (int i = 0; i < 2; ++i) {
                        executor.submit(() -> {
                            for (int j = 0; j < 2000; ++j) {
                                ledgerIdGenerator.generateLedgerId((rc, result) -> {
                                    if (KeeperException.Code.OK.intValue() == rc) {
                                        shortLedgerIds.add(result);
                                    } else {
                                        errCount.incrementAndGet();
                                    }
                                    countDownLatch1.countDown();
                                });
                            }
                        });
                    }
                    countDownLatch1.await();
                    CountDownLatch countDownLatch2 = new CountDownLatch(4000);
                    store.put("/ledgers/idgen-long", new byte[0], Optional.empty()).join();
                    for (int i = 0; i < 2; ++i) {
                        executor.submit(() -> {
                            for (int j = 0; j < 2000; ++j) {
                                ledgerIdGenerator.generateLedgerId((rc, result) -> {
                                    if (KeeperException.Code.OK.intValue() == rc) {
                                        longLedgerIds.add(result);
                                    } else {
                                        errCount.incrementAndGet();
                                    }
                                    countDownLatch2.countDown();
                                });
                            }
                        });
                    }
                    Assert.assertTrue((boolean)countDownLatch2.await(120L, TimeUnit.SECONDS), (String)"Wait ledger id generation threads to stop timeout : ");
                    log.info("Number of generated ledger id: {}, time used: {}", (Object)(shortLedgerIds.size() + longLedgerIds.size()), (Object)(System.currentTimeMillis() - start));
                    Assert.assertEquals((int)errCount.get(), (int)0, (String)"Error occur during ledger id generation : ");
                    HashSet<Long> ledgers = new HashSet<Long>();
                    while (!shortLedgerIds.isEmpty()) {
                        ledger = (Long)shortLedgerIds.poll();
                        Assert.assertNotNull((Object)ledger, (String)"Generated ledger id is null");
                        Assert.assertFalse((boolean)ledgers.contains(ledger), (String)("Ledger id [" + ledger + "] conflict : "));
                        ledgers.add(ledger);
                    }
                    while (!longLedgerIds.isEmpty()) {
                        ledger = (Long)longLedgerIds.poll();
                        Assert.assertNotNull((Object)ledger, (String)"Generated ledger id is null");
                        Assert.assertFalse((boolean)ledgers.contains(ledger), (String)("Ledger id [" + ledger + "] conflict : "));
                        ledgers.add(ledger);
                    }
                }
                finally {
                    if (Collections.singletonList(executor).get(0) != null) {
                        executor.shutdownNow();
                    }
                }
            }
            finally {
                if (Collections.singletonList(ledgerIdGenerator).get(0) != null) {
                    ledgerIdGenerator.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGenerateLedgerIdWithZkPrefix() throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)(this.zks.getConnectionString() + "/test"), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerIdGenerator ledgerIdGenerator = new PulsarLedgerIdGenerator(store, "/ledgers");
            try {
                int nThread = 2;
                int nLedgers = 2000;
                CountDownLatch countDownLatch1 = new CountDownLatch(4000);
                AtomicInteger errCount = new AtomicInteger(0);
                ConcurrentLinkedQueue shortLedgerIds = new ConcurrentLinkedQueue();
                ConcurrentLinkedQueue longLedgerIds = new ConcurrentLinkedQueue();
                long start = System.currentTimeMillis();
                ExecutorService executor = Executors.newCachedThreadPool();
                try {
                    Long ledger;
                    for (int i = 0; i < 2; ++i) {
                        executor.submit(() -> {
                            for (int j = 0; j < 2000; ++j) {
                                ledgerIdGenerator.generateLedgerId((rc, result) -> {
                                    if (KeeperException.Code.OK.intValue() == rc) {
                                        shortLedgerIds.add(result);
                                    } else {
                                        errCount.incrementAndGet();
                                    }
                                    countDownLatch1.countDown();
                                });
                            }
                        });
                    }
                    countDownLatch1.await();
                    for (Long ledgerId : shortLedgerIds) {
                        Assert.assertFalse((boolean)((Boolean)store.exists("/ledgers/idgen/ID-" + String.format("%010d", ledgerId)).get()), (String)"Exception during deleting node for id generation : ");
                    }
                    CountDownLatch countDownLatch2 = new CountDownLatch(4000);
                    store.put("/ledgers/idgen-long", new byte[0], Optional.empty()).join();
                    for (int i = 0; i < 2; ++i) {
                        executor.submit(() -> {
                            for (int j = 0; j < 2000; ++j) {
                                ledgerIdGenerator.generateLedgerId((rc, result) -> {
                                    if (KeeperException.Code.OK.intValue() == rc) {
                                        longLedgerIds.add(result);
                                    } else {
                                        errCount.incrementAndGet();
                                    }
                                    countDownLatch2.countDown();
                                });
                            }
                        });
                    }
                    Assert.assertTrue((boolean)countDownLatch2.await(120L, TimeUnit.SECONDS), (String)"Wait ledger id generation threads to stop timeout : ");
                    for (Long ledgerId : longLedgerIds) {
                        Assert.assertFalse((boolean)((Boolean)store.exists("/ledgers/idgen-long/HOB-0000000001/ID-" + String.format("%010d", ledgerId >> 32)).get()), (String)"Exception during deleting node for id generation : ");
                    }
                    log.info("Number of generated ledger id: {}, time used: {}", (Object)(shortLedgerIds.size() + longLedgerIds.size()), (Object)(System.currentTimeMillis() - start));
                    Assert.assertEquals((int)errCount.get(), (int)0, (String)"Error occur during ledger id generation : ");
                    HashSet<Long> ledgers = new HashSet<Long>();
                    while (!shortLedgerIds.isEmpty()) {
                        ledger = (Long)shortLedgerIds.poll();
                        Assert.assertNotNull((Object)ledger, (String)"Generated ledger id is null");
                        Assert.assertFalse((boolean)ledgers.contains(ledger), (String)("Ledger id [" + ledger + "] conflict : "));
                        ledgers.add(ledger);
                    }
                    while (!longLedgerIds.isEmpty()) {
                        ledger = (Long)longLedgerIds.poll();
                        Assert.assertNotNull((Object)ledger, (String)"Generated ledger id is null");
                        Assert.assertFalse((boolean)ledgers.contains(ledger), (String)("Ledger id [" + ledger + "] conflict : "));
                        ledgers.add(ledger);
                    }
                }
                finally {
                    if (Collections.singletonList(executor).get(0) != null) {
                        executor.shutdownNow();
                    }
                }
            }
            finally {
                if (Collections.singletonList(ledgerIdGenerator).get(0) != null) {
                    ledgerIdGenerator.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="impl")
    public void testEnsureCounterIsNotResetWithContainerNodes(String provider, Supplier<String> urlSupplier) throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)urlSupplier.get(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        try {
            PulsarLedgerIdGenerator ledgerIdGenerator = new PulsarLedgerIdGenerator(store, "/ledgers");
            try {
                CountDownLatch l1 = new CountDownLatch(1);
                AtomicLong res1 = new AtomicLong();
                ledgerIdGenerator.generateLedgerId((rc, result) -> {
                    Assert.assertEquals((int)rc, (int)0);
                    res1.set((long)result);
                    l1.countDown();
                });
                l1.await();
                log.info("res1 : {}", (Object)res1);
                this.zks.checkContainers();
                CountDownLatch l2 = new CountDownLatch(1);
                AtomicLong res2 = new AtomicLong();
                ledgerIdGenerator.generateLedgerId((rc, result) -> {
                    Assert.assertEquals((int)rc, (int)0);
                    res2.set((long)result);
                    l2.countDown();
                });
                l2.await();
                log.info("res2 : {}", (Object)res2);
                Assert.assertNotEquals((Object)res1, (Object)res2);
                Assert.assertTrue((res1.get() < res2.get() ? 1 : 0) != 0);
            }
            finally {
                if (Collections.singletonList(ledgerIdGenerator).get(0) != null) {
                    ledgerIdGenerator.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }
}

