package org.apache.pinot.plugin.stream.pulsar;

import java.util.HashMap;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.class */
public class PulsarConsumerTest {
    public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
    public static final String TEST_TOPIC = "test-topic";
    public static final int NUM_PARTITION = 1;
    public static final String MESSAGE_PREFIX = "sample_msg";
    public static final int NUM_RECORDS_PER_PARTITION = 1000;
    public static final String CLIENT_ID = "clientId";
    private PulsarClient _pulsarClient;
    private PulsarStandaloneCluster _pulsarStandaloneCluster;
    private HashMap<Integer, MessageId> _partitionToFirstMessageIdMap = new HashMap<>();

    @BeforeClass
    public void setUp() throws Exception {
        try {
            this._pulsarStandaloneCluster = new PulsarStandaloneCluster();
            this._pulsarStandaloneCluster.start();
            PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + this._pulsarStandaloneCluster.getAdminPort()).build();
            this._pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:" + this._pulsarStandaloneCluster.getBrokerPort()).build();
            build.topics().createPartitionedTopic(TEST_TOPIC, 1);
            publishRecords();
        } catch (Exception e) {
            if (this._pulsarStandaloneCluster != null) {
                this._pulsarStandaloneCluster.stop();
            }
            throw new RuntimeException("Failed to setUp test environment", e);
        }
    }

    @AfterClass
    public void tearDown() throws Exception {
        if (this._pulsarStandaloneCluster != null) {
            this._pulsarStandaloneCluster.stop();
        }
    }

    public void publishRecords() throws Exception {
        for (int i = 0; i < 1; i++) {
            final int i2 = i;
            Producer create = this._pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC).messageRouter(new MessageRouter() { // from class: org.apache.pinot.plugin.stream.pulsar.PulsarConsumerTest.1
                public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
                    return i2;
                }
            }).create();
            for (int i3 = 0; i3 < 1000; i3++) {
                MessageId send = create.send("sample_msg_" + i3);
                if (!this._partitionToFirstMessageIdMap.containsKey(Integer.valueOf(i2))) {
                    this._partitionToFirstMessageIdMap.put(Integer.valueOf(i2), send);
                }
            }
        }
    }

    public StreamConfig getStreamConfig() {
        String str = "pulsar://localhost:" + this._pulsarStandaloneCluster.getBrokerPort();
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "pulsar");
        hashMap.put("stream.pulsar.consumer.type", "simple");
        hashMap.put("stream.pulsar.topic.name", TEST_TOPIC);
        hashMap.put("stream.pulsar.bootstrap.servers", str);
        hashMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest");
        hashMap.put("stream.pulsar.consumer.factory.class.name", getPulsarConsumerFactoryName());
        hashMap.put(StreamConfigProperties.constructStreamProperty("pulsar", "fetch.timeout.millis"), "1000");
        hashMap.put("stream.pulsar.decoder.class.name", "decoderClass");
        return new StreamConfig(TABLE_NAME_WITH_TYPE, hashMap);
    }

    protected String getPulsarConsumerFactoryName() {
        return PulsarConsumerFactory.class.getName();
    }

    @Test
    public void testPartitionLevelConsumer() throws Exception {
        StreamConsumerFactory create = StreamConsumerFactoryProvider.create(getStreamConfig());
        int fetchPartitionCount = new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig()).fetchPartitionCount(10000L);
        for (int i = 0; i < fetchPartitionCount; i++) {
            int i2 = 0;
            PartitionGroupConsumer createPartitionGroupConsumer = create.createPartitionGroupConsumer(CLIENT_ID, new PartitionGroupConsumptionStatus(i, 1, new MessageIdStreamOffset(MessageId.earliest), (StreamPartitionMsgOffset) null, "CONSUMING"));
            MessageBatch fetchMessages = createPartitionGroupConsumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(i, 500)), 10000);
            Assert.assertEquals(fetchMessages.getMessageCount(), 500);
            for (int i3 = 0; i3 < fetchMessages.getMessageCount(); i3++) {
                Assert.assertEquals(new String((byte[]) fetchMessages.getMessageAtIndex(i3)), "sample_msg_" + i3);
                i2++;
            }
            MessageBatch fetchMessages2 = createPartitionGroupConsumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(i, 500)), (StreamPartitionMsgOffset) null, 10000);
            Assert.assertEquals(fetchMessages2.getMessageCount(), 500);
            for (int i4 = 0; i4 < fetchMessages2.getMessageCount(); i4++) {
                Assert.assertEquals(new String((byte[]) fetchMessages2.getMessageAtIndex(i4)), "sample_msg_" + (500 + i4));
                i2++;
            }
            MessageBatch fetchMessages3 = createPartitionGroupConsumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(i, 10)), new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(i, 35)), 10000);
            Assert.assertEquals(fetchMessages3.getMessageCount(), 25);
            for (int i5 = 0; i5 < fetchMessages3.getMessageCount(); i5++) {
                Assert.assertEquals(new String((byte[]) fetchMessages3.getMessageAtIndex(i5)), "sample_msg_" + (10 + i5));
            }
            Assert.assertEquals(i2, NUM_RECORDS_PER_PARTITION);
        }
    }

    public MessageId getMessageIdForPartitionAndIndex(int i, int i2) {
        return DefaultImplementation.newMessageId(MessageIdImpl.convertToMessageIdImpl(this._partitionToFirstMessageIdMap.get(Integer.valueOf(i))).getLedgerId(), i2, i);
    }
}
