package org.apache.pinot.core.realtime.impl.kafka2;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pinot.core.realtime.impl.kafka2.utils.MiniKafkaCluster;
import org.apache.pinot.core.realtime.stream.MessageBatch;
import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.class */
public class KafkaPartitionLevelConsumerTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
    private static final long STABILIZE_SLEEP_DELAYS = 3000;
    private static final String TEST_TOPIC_1 = "foo";
    private static final String TEST_TOPIC_2 = "bar";
    private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
    private static MiniKafkaCluster kafkaCluster;
    private static String brokerAddress;

    @BeforeClass
    public static void setup() throws Exception {
        kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build();
        LOGGER.info("Trying to start MiniKafkaCluster");
        kafkaCluster.start();
        brokerAddress = getKakfaBroker();
        kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
        kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
        Thread.sleep(STABILIZE_SLEEP_DELAYS);
        produceMsgToKafka();
        Thread.sleep(STABILIZE_SLEEP_DELAYS);
    }

    private static void produceMsgToKafka() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getKakfaBroker());
        properties.put("client.id", "clientId");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
            kafkaProducer.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i));
            kafkaProducer.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
            kafkaProducer.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
        }
    }

    private static String getKakfaBroker() {
        return "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
    }

    @AfterClass
    public static void shutDown() throws Exception {
        kafkaCluster.deleteTopic(TEST_TOPIC_1);
        kafkaCluster.deleteTopic(TEST_TOPIC_2);
        kafkaCluster.close();
    }

    @Test
    public void testBuildConsumer() throws Exception {
        String str = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", KafkaConsumerFactory.class.getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        hashMap.put("stream.kafka.fetcher.size", "10000");
        hashMap.put("stream.kafka.fetcher.minBytes", "20000");
        StreamConfig streamConfig = new StreamConfig("tableName_REALTIME", hashMap);
        new KafkaStreamMetadataProvider("clientId", streamConfig);
        KafkaPartitionLevelConsumer kafkaPartitionLevelConsumer = new KafkaPartitionLevelConsumer("clientId", streamConfig, 0);
        kafkaPartitionLevelConsumer.fetchMessages(12345L, 23456L, 10000);
        Assert.assertEquals(512000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
        Assert.assertEquals(10000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
        Assert.assertEquals(10000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
        Assert.assertEquals(20000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherMinBytes());
        hashMap.put("stream.kafka.buffer.size", "100");
        hashMap.put("stream.kafka.socket.timeout", "1000");
        KafkaPartitionLevelConsumer kafkaPartitionLevelConsumer2 = new KafkaPartitionLevelConsumer("clientId", new StreamConfig("tableName_REALTIME", hashMap), 0);
        kafkaPartitionLevelConsumer2.fetchMessages(12345L, 23456L, 10000);
        Assert.assertEquals(100, kafkaPartitionLevelConsumer2.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
        Assert.assertEquals(NUM_MSG_PRODUCED_PER_PARTITION, kafkaPartitionLevelConsumer2.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
    }

    @Test
    public void testGetPartitionCount() {
        String str = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", KafkaConsumerFactory.class.getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        Assert.assertEquals(new KafkaStreamMetadataProvider("clientId", new StreamConfig("tableName_REALTIME", hashMap)).fetchPartitionCount(10000L), 1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("streamType", "kafka");
        hashMap2.put("stream.kafka.topic.name", TEST_TOPIC_2);
        hashMap2.put("stream.kafka.broker.list", str);
        hashMap2.put("stream.kafka.consumer.type", "simple");
        hashMap2.put("stream.kafka.consumer.factory.class.name", KafkaConsumerFactory.class.getName());
        hashMap2.put("stream.kafka.decoder.class.name", "decoderClass");
        Assert.assertEquals(new KafkaStreamMetadataProvider("clientId", new StreamConfig("tableName_REALTIME", hashMap2)).fetchPartitionCount(10000L), 2);
    }

    @Test
    public void testFetchMessages() throws Exception {
        String str = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", KafkaConsumerFactory.class.getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        new KafkaPartitionLevelConsumer("clientId", new StreamConfig("tableName_REALTIME", hashMap), 0).fetchMessages(12345L, 23456L, 10000);
    }

    @Test
    public void testFetchOffsets() throws Exception {
        testFetchOffsets(TEST_TOPIC_1);
        testFetchOffsets(TEST_TOPIC_2);
    }

    private void testFetchOffsets(String str) throws Exception {
        String str2 = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", str);
        hashMap.put("stream.kafka.broker.list", str2);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", KafkaConsumerFactory.class.getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        StreamConfig streamConfig = new StreamConfig("tableName_REALTIME", hashMap);
        int fetchPartitionCount = new KafkaStreamMetadataProvider("clientId", streamConfig).fetchPartitionCount(10000L);
        for (int i = 0; i < fetchPartitionCount; i++) {
            KafkaStreamMetadataProvider kafkaStreamMetadataProvider = new KafkaStreamMetadataProvider("clientId", streamConfig, i);
            Assert.assertEquals(0L, kafkaStreamMetadataProvider.fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000L));
            Assert.assertEquals(1000L, kafkaStreamMetadataProvider.fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 10000L));
        }
    }

    @Test
    public void testConsumer() throws Exception {
        testConsumer(TEST_TOPIC_1);
        testConsumer(TEST_TOPIC_2);
    }

    private void testConsumer(String str) throws TimeoutException {
        String str2 = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", str);
        hashMap.put("stream.kafka.broker.list", str2);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", KafkaConsumerFactory.class.getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        StreamConfig streamConfig = new StreamConfig("tableName_REALTIME", hashMap);
        StreamConsumerFactory create = StreamConsumerFactoryProvider.create(streamConfig);
        int fetchPartitionCount = new KafkaStreamMetadataProvider("clientId", streamConfig).fetchPartitionCount(10000L);
        for (int i = 0; i < fetchPartitionCount; i++) {
            PartitionLevelConsumer createPartitionLevelConsumer = create.createPartitionLevelConsumer("clientId", i);
            MessageBatch fetchMessages = createPartitionLevelConsumer.fetchMessages(0L, 1000L, 10000);
            Assert.assertEquals(fetchMessages.getMessageCount(), 500);
            for (int i2 = 0; i2 < fetchMessages.getMessageCount(); i2++) {
                Assert.assertEquals(new String((byte[]) fetchMessages.getMessageAtIndex(i2)), "sample_msg_" + i2);
            }
            MessageBatch fetchMessages2 = createPartitionLevelConsumer.fetchMessages(500L, 1000L, 10000);
            Assert.assertEquals(fetchMessages2.getMessageCount(), 500);
            for (int i3 = 0; i3 < fetchMessages2.getMessageCount(); i3++) {
                Assert.assertEquals(new String((byte[]) fetchMessages2.getMessageAtIndex(i3)), "sample_msg_" + (500 + i3));
            }
            MessageBatch fetchMessages3 = createPartitionLevelConsumer.fetchMessages(10L, 35L, 10000);
            Assert.assertEquals(fetchMessages3.getMessageCount(), 25);
            for (int i4 = 0; i4 < fetchMessages3.getMessageCount(); i4++) {
                Assert.assertEquals(new String((byte[]) fetchMessages3.getMessageAtIndex(i4)), "sample_msg_" + (10 + i4));
            }
        }
    }
}
