package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/ReaderTest.class */
public class ReaderTest extends MockedPulsarServiceBaseTest {
    private static final String subscription = "reader-sub";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1:" + this.BROKER_WEBSERVICE_PORT));
        this.admin.tenants().createTenant("my-property", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
        this.admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Set<String> publishMessages(String str, int i, boolean z) throws Exception {
        HashSet hashSet = new HashSet();
        ProducerBuilder newProducer = this.pulsarClient.newProducer();
        newProducer.messageRoutingMode(MessageRoutingMode.SinglePartition);
        newProducer.maxPendingMessages(i);
        newProducer.topic(str);
        if (z) {
            newProducer.enableBatching(true);
            newProducer.batchingMaxMessages(i);
        } else {
            newProducer.enableBatching(false);
        }
        Producer create = newProducer.create();
        Throwable th = null;
        try {
            CompletableFuture completableFuture = null;
            for (int i2 = 0; i2 < i; i2++) {
                String str2 = "key" + i2;
                completableFuture = create.newMessage().key(str2).value(("my-message-" + i2).getBytes()).sendAsync();
                hashSet.add(str2);
            }
            completableFuture.get();
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
            return hashSet;
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReadMessageWithoutBatching() throws Exception {
        testReadMessages("persistent://my-property/my-ns/my-reader-topic", false);
    }

    @Test
    public void testReadMessageWithBatching() throws Exception {
        testReadMessages("persistent://my-property/my-ns/my-reader-topic-with-batching", true);
    }

    private void testReadMessages(String str, boolean z) throws Exception {
        Set<String> publishMessages = publishMessages(str, 10, z);
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).readerName(subscription).create();
        while (create.hasMessageAvailable()) {
            Assert.assertTrue(publishMessages.remove(create.readNext().getKey()));
        }
        Assert.assertTrue(publishMessages.isEmpty());
    }

    @Test
    public void testReadFromPartition() throws Exception {
        String str = "persistent://my-property/my-ns/testReadFromPartition-partition-0";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testReadFromPartition", 4);
        Set<String> publishMessages = publishMessages(str, 10, false);
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).create();
        while (create.hasMessageAvailable()) {
            Assert.assertTrue(publishMessages.remove(create.readNext().getKey()));
        }
        Assert.assertTrue(publishMessages.isEmpty());
    }
}
