package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.zookeeper.ZooKeeper;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.class */
public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
    public BrokerBkEnsemblesTests() {
        this(3);
    }

    public BrokerBkEnsemblesTests(int i) {
        super(i);
    }

    @Test
    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
        ZooKeeper zkClient = this.bkEnsemble.getZkClient();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();
        this.admin.namespaces().createNamespace("prop/usc/crash-broker");
        Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/usc/crash-broker/my-topic").subscriptionName("my-subscriber-name").subscribe();
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/usc/crash-broker/my-topic").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge((Message<?>) subscribe.receive(1, TimeUnit.SECONDS));
        }
        ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) ((PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/usc/crash-broker/my-topic").get()).getManagedLedger().getCursors().iterator().next();
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            return managedCursorImpl.getState().equals("Open");
        }, 5, 100L);
        long cursorLedger = managedCursorImpl.getCursorLedger();
        String str = "/ledgers" + StringUtils.getHybridHierarchicalLedgerPath(cursorLedger);
        Assert.assertNotNull(zkClient.exists(str, false));
        subscribe.close();
        create.close();
        this.pulsar.getBrokerService().removeTopicFromCache("persistent://prop/usc/crash-broker/my-topic");
        ManagedLedgerFactoryImpl managedLedgerFactory = this.pulsar.getManagedLedgerFactory();
        Field declaredField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        declaredField.setAccessible(true);
        ((ConcurrentHashMap) declaredField.get(managedLedgerFactory)).clear();
        Consumer<byte[]> subscribe2 = build.newConsumer().topic("persistent://prop/usc/crash-broker/my-topic").subscriptionName("my-subscriber-name").subscribe();
        Producer<byte[]> create2 = build.newProducer().topic("persistent://prop/usc/crash-broker/my-topic").create();
        for (int i3 = 0; i3 < 10; i3++) {
            create2.send(("my-message-" + i3).getBytes());
        }
        for (int i4 = 0; i4 < 10; i4++) {
            subscribe2.acknowledge((Message<?>) subscribe2.receive(1, TimeUnit.SECONDS));
        }
        ManagedCursorImpl managedCursorImpl2 = (ManagedCursorImpl) ((PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/usc/crash-broker/my-topic").get()).getManagedLedger().getCursors().iterator().next();
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            return managedCursorImpl2.getState().equals("Open");
        }, 5, 100L);
        long cursorLedger2 = managedCursorImpl2.getCursorLedger();
        Assert.assertNotEquals(Long.valueOf(cursorLedger2), -1);
        Assert.assertNotEquals(Long.valueOf(cursorLedger), Long.valueOf(cursorLedger2));
        Assert.assertNull(zkClient.exists(str, false));
        create2.close();
        subscribe2.close();
        build.close();
    }

    @Test
    public void testSkipCorruptDataLedger() throws Exception {
        this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            this.admin.namespaces().createNamespace("prop/usc/crash-broker");
        } catch (Exception e) {
        }
        String str = "persistent://prop/usc/crash-broker/my-topic-" + System.currentTimeMillis();
        Consumer<byte[]> subscribe = build.newConsumer().topic(str).subscriptionName("my-subscriber-name").receiverQueueSize(5).subscribe();
        ManagedLedgerImpl managedLedger = ((PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get()).getManagedLedger();
        ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) managedLedger.getCursors().iterator().next();
        Field declaredField = ManagedCursorImpl.class.getDeclaredField("config");
        declaredField.setAccessible(true);
        ManagedLedgerConfig managedLedgerConfig = (ManagedLedgerConfig) declaredField.get(managedCursorImpl);
        managedLedgerConfig.setMaxEntriesPerLedger(20);
        managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
        Field declaredField2 = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
        declaredField2.setAccessible(true);
        BookKeeper bookKeeper = (BookKeeper) declaredField2.get(managedLedger);
        Producer<byte[]> create = build.newProducer().topic(str).create();
        for (int i = 0; i < 100; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Assert.assertNotNull(subscribe.receive(1, TimeUnit.SECONDS));
        subscribe.close();
        NavigableMap ledgersInfo = managedLedger.getLedgersInfo();
        Assert.assertEquals(ledgersInfo.size(), 5);
        Map.Entry lastEntry = ledgersInfo.lastEntry();
        ledgersInfo.entrySet().forEach(entry -> {
            if (entry.equals(lastEntry)) {
                return;
            }
            Assert.assertEquals(((MLDataFormats.ManagedLedgerInfo.LedgerInfo) entry.getValue()).getEntries(), 20L);
            try {
                bookKeeper.deleteLedger(((Long) entry.getKey()).longValue());
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        });
        create.close();
        this.pulsar.getBrokerService().removeTopicFromCache(str);
        ManagedLedgerFactoryImpl managedLedgerFactory = this.pulsar.getManagedLedgerFactory();
        Field declaredField3 = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        declaredField3.setAccessible(true);
        ((ConcurrentHashMap) declaredField3.get(managedLedgerFactory)).clear();
        Consumer<byte[]> subscribe2 = build.newConsumer().topic(str).subscriptionName("my-subscriber-name").subscribe();
        Assert.assertNull(subscribe2.receive(1, TimeUnit.SECONDS));
        subscribe2.close();
        this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
        MockedPulsarServiceBaseTest.retryStrategically(r3 -> {
            return managedLedgerConfig.isAutoSkipNonRecoverableData();
        }, 5, 100L);
        Consumer<byte[]> subscribe3 = build.newConsumer().topic(str).subscriptionName("my-subscriber-name").subscribe();
        for (int i2 = 0; i2 < 20; i2++) {
            Message<byte[]> receive = subscribe3.receive();
            System.out.println(i2);
            subscribe3.acknowledge((Message<?>) receive);
        }
        create.close();
        subscribe3.close();
        build.close();
    }

    @Test(timeOut = 20000)
    public void testTopicWithWildCardChar() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            this.admin.namespaces().createNamespace("prop/usc/topicWithSpecialChar");
        } catch (Exception e) {
        }
        byte[] bytes = "test".getBytes();
        Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524").subscriptionName("c1").subscribe();
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/usc/topicWithSpecialChar/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524").create();
        create.send(bytes);
        Assert.assertEquals(subscribe.receive().getData(), bytes);
        subscribe.close();
        create.close();
        build.close();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testDeleteTopicWithMissingData() throws Exception {
        String str = "prop/usc-" + System.nanoTime();
        this.admin.namespaces().createNamespace(str);
        String str2 = str + "/my-topic-" + System.nanoTime();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer subscribe = build.newConsumer(Schema.STRING).topic(str2).subscriptionName("test").subscribe();
            try {
                Producer create = build.newProducer(Schema.STRING).topic(str2).create();
                try {
                    create.send("Hello");
                    this.bkEnsemble.stopBK();
                    this.admin.topics().unload(str2);
                    Thread.sleep(1000L);
                    try {
                        this.pulsar.getBrokerService().getTopicIfExists(str2).get();
                        Assert.fail("Should have thrown exception");
                    } catch (ExecutionException e) {
                    }
                    this.admin.topics().delete(str2);
                    Assert.assertEquals(this.pulsar.getBrokerService().getTopicIfExists(str2).join(), Optional.empty());
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }
}
