package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/TopicReaderTest.class */
public class TopicReaderTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSimpleReader() throws Exception {
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, new ReaderConfiguration());
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(createReader.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        createReader.close();
        createProducer.close();
    }

    @Test
    public void testReaderAfterMessagesWerePublished() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, new ReaderConfiguration());
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(createReader.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        createReader.close();
        createProducer.close();
    }

    @Test
    public void testMultipleReaders() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, new ReaderConfiguration());
        Reader createReader2 = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, new ReaderConfiguration());
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(createReader.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            String str2 = new String(createReader2.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str2);
            testMessageOrderAndDuplicates(newHashSet2, str2, "my-message-" + i3);
        }
        createReader.close();
        createReader2.close();
        createProducer.close();
    }

    @Test
    public void testTopicStats() throws Exception {
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, new ReaderConfiguration());
        Reader createReader2 = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, new ReaderConfiguration());
        Assert.assertEquals(this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1").subscriptions.size(), 2);
        createReader.close();
        Assert.assertEquals(this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1").subscriptions.size(), 1);
        createReader2.close();
        Assert.assertEquals(this.admin.persistentTopics().getStats("persistent://my-property/use/my-ns/my-topic1").subscriptions.size(), 0);
    }

    @Test
    public void testReaderOnLastMessage() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.latest, new ReaderConfiguration());
        for (int i2 = 10; i2 < 20; i2++) {
            createProducer.send(("my-message-" + i2).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i3 = 10; i3 < 20; i3++) {
            String str = new String(createReader.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i3);
        }
        createReader.close();
        createProducer.close();
    }

    @Test
    public void testReaderOnSpecificMessage() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(createProducer.send(("my-message-" + i).getBytes()));
        }
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", (MessageId) arrayList.get(4), new ReaderConfiguration());
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 5; i2 < 10; i2++) {
            String str = new String(createReader.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        createReader.close();
        createProducer.close();
    }

    @Test
    public void testReaderOnSpecificMessageWithBatches() throws Exception {
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setBatchingEnabled(true);
        producerConfiguration.setBatchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS);
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches", producerConfiguration);
        for (int i = 0; i < 10; i++) {
            createProducer.sendAsync(("my-message-" + i).getBytes());
        }
        createProducer.send("my-message-10".getBytes());
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches", MessageId.earliest, new ReaderConfiguration());
        MessageId messageId = null;
        for (int i2 = 0; i2 < 5; i2++) {
            messageId = createReader.readNext().getMessageId();
        }
        Assert.assertEquals(messageId.getClass(), BatchMessageIdImpl.class);
        System.out.println("CREATING READER ON MSG ID: " + messageId);
        Reader createReader2 = this.pulsarClient.createReader("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches", messageId, new ReaderConfiguration());
        for (int i3 = 5; i3 < 11; i3++) {
            String str = new String(createReader2.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            Assert.assertEquals(str, "my-message-" + i3);
        }
        createProducer.close();
    }

    @Test(groups = {"encryption"})
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        ReaderConfiguration readerConfiguration = new ReaderConfiguration();
        readerConfiguration.setCryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.TopicReaderTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        });
        Reader createReader = this.pulsarClient.createReader("persistent://my-property/use/my-ns/test-reader-myecdsa-topic1", MessageId.latest, readerConfiguration);
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.addEncryptionKey("client-ecdsa.pem");
        producerConfiguration.setCryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.TopicReaderTest.1EncKeyReader
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        });
        Producer createProducer = this.pulsarClient.createProducer("persistent://my-property/use/my-ns/test-reader-myecdsa-topic1", producerConfiguration);
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(createReader.readNext(5, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        createProducer.close();
        createReader.close();
        log.info("-- Exiting {} test --", this.methodName);
    }
}
