package org.apache.pulsar.broker.service.schema;

import java.util.Collections;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.BrokerBkEnsemblesTests;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.class */
public class PartitionedTopicsSchemaTest extends BrokerBkEnsemblesTests {
    @Test
    public void partitionedTopicWithSchema() throws Exception {
        this.admin.namespaces().createNamespace("prop/my-test", Collections.singleton("usc"));
        this.admin.topics().createPartitionedTopic("prop/my-test/my-topic", 16);
        PulsarClient build = PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_SERVICE_PORT).build();
        CompletableFuture createAsync = build.newProducer(Schema.STRING).topic("prop/my-test/my-topic").createAsync();
        CompletableFuture subscribeAsync = build.newConsumer(Schema.STRING).topic("prop/my-test/my-topic").subscriptionName("sub").subscribeAsync();
        CompletableFuture.allOf(createAsync, subscribeAsync).get();
        Producer producer = (Producer) createAsync.get();
        Consumer consumer = (Consumer) subscribeAsync.get();
        for (int i = 0; i < 10; i++) {
            producer.send("Hello-" + i);
        }
        consumer.close();
        producer.close();
        this.admin.namespaces().unload("prop/my-test");
        CompletableFuture createAsync2 = build.newProducer(Schema.STRING).topic("prop/my-test/my-topic").createAsync();
        CompletableFuture subscribeAsync2 = build.newConsumer(Schema.STRING).topic("prop/my-test/my-topic").subscriptionName("sub").subscribeAsync();
        CompletableFuture.allOf(createAsync2, subscribeAsync2).get();
        Consumer consumer2 = (Consumer) subscribeAsync2.get();
        TreeSet treeSet = new TreeSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Message<?> receive = consumer2.receive();
            treeSet.add(receive.getValue());
            consumer2.acknowledge(receive);
        }
        Assert.assertEquals(treeSet.size(), 10);
        for (int i3 = 0; i3 < 10; i3++) {
            Assert.assertTrue(treeSet.contains("Hello-" + i3));
        }
        build.close();
    }

    @Override // org.apache.pulsar.broker.service.BrokerBkEnsemblesTests
    @Test(enabled = false)
    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
    }

    @Override // org.apache.pulsar.broker.service.BrokerBkEnsemblesTests
    @Test(enabled = false)
    public void testSkipCorruptDataLedger() throws Exception {
    }
}
